-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5638: Improve the Required ACL of ListGroups API (KIP-231) #5352
Conversation
47406fc
to
deb839b
Compare
cc @hachikuji @ewencp for review. Thanks! |
sendResponseMaybeThrottle(request, requestThrottleMs => | ||
request.body[ListGroupsRequest].getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) | ||
else { | ||
val filteredGroups = groups.filter(group => authorize(request.session, Describe, new Resource(Group, group.groupId, LITERAL))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just return CLUSTER_AUTHORIZATION_FAILED error, if filteredGroups is empty? This may avoid getAcls().filter iteration above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think that works, and should simply the error condition. Updated the PR. Thanks!
8321349
to
8cee70f
Compare
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left a few minor comments.
val filteredGroups = groups.filter(group => authorize(request.session, Describe, new Resource(Group, group.groupId, LITERAL))) | ||
if (filteredGroups.isEmpty) { | ||
sendResponseMaybeThrottle(request, requestThrottleMs => | ||
request.body[ListGroupsRequest].getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we return an empty list of groups and no error instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could return an empty list instead of throwing an error. I guess one reason I went with the error was for compatibility too. Prior to this, if the user did not have describe cluster permission they received this error. With your suggestion of returning an empty we never return an error. I thought this might confuse existing users.
In the latest commit I went with your suggestion.
@@ -1238,14 +1238,20 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
} | |||
|
|||
def handleListGroupsRequest(request: RequestChannel.Request) { | |||
if (!authorize(request.session, Describe, Resource.ClusterResource)) { | |||
val (error, groups) = groupCoordinator.handleListGroups() | |||
// the alternative where at least some Describe Group ACLs for current principal is needed for running ListGroups |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth mentioning in this comment that the cluster permission is retained for compatibility.
val otherConsumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group2, securityProtocol = SecurityProtocol.PLAINTEXT) | ||
otherConsumer.subscribe(Collections.singleton(topic)) | ||
consumeRecords(otherConsumer) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unneeded newline
removeAllAcls() | ||
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), Resource.ClusterResource) | ||
// it should list both groups (due to cluster describe permission) | ||
assertEquals(2, adminClient.listConsumerGroups().all().get().size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could replace this with an explicit comparison using a set perhaps? Same below.
// write some record to the topic | ||
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource) | ||
sendRecords(1, tp) | ||
removeAllAcls() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems we could skip this since we remove all acls before describing the group anyway
adf1075
to
93ef0d2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji thanks for reviewing the PR. I submitted a patch to address your feedback. Also, there is a response inline.
val filteredGroups = groups.filter(group => authorize(request.session, Describe, new Resource(Group, group.groupId, LITERAL))) | ||
if (filteredGroups.isEmpty) { | ||
sendResponseMaybeThrottle(request, requestThrottleMs => | ||
request.body[ListGroupsRequest].getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could return an empty list instead of throwing an error. I guess one reason I went with the error was for compatibility too. Prior to this, if the user did not have describe cluster permission they received this error. With your suggestion of returning an empty we never return an error. I thought this might confuse existing users.
In the latest commit I went with your suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. Left a couple additional comments.
new ListGroupsResponse(requestThrottleMs, error, groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }.asJava)) | ||
else { | ||
val filteredGroups = groups.filter(group => authorize(request.session, Describe, new Resource(Group, group.groupId, LITERAL))) | ||
if (filteredGroups.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the check? I think the else
case works if filteredGroups
is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, will update. Thanks.
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) | ||
// it should list only one group now | ||
val groupList = adminClient.listConsumerGroups().all().get().asScala.toList | ||
assertTrue(groupList.length == 1 && groupList.head.groupId == group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It's generally preferred to use assertEquals
. The advantage is that this allows junit to provide a more helpful message upon failure (we can see the value that failed the comparison). Similarly for the couple assertTrue
uses below, we can compare with an empty list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Will update in the next patch.
93ef0d2
to
6fb99d3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Thanks for taking another look. Submitted a patch to address your feedback.
new ListGroupsResponse(requestThrottleMs, error, groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }.asJava)) | ||
else { | ||
val filteredGroups = groups.filter(group => authorize(request.session, Describe, new Resource(Group, group.groupId, LITERAL))) | ||
if (filteredGroups.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, will update. Thanks.
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) | ||
// it should list only one group now | ||
val groupList = adminClient.listConsumerGroups().all().get().asScala.toList | ||
assertTrue(groupList.length == 1 && groupList.head.groupId == group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Will update in the next patch.
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch.
…che#5352) Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
KIP-231
Committer Checklist (excluded from commit message)