New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13889: Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL #12160
Conversation
57a6b08
to
3ef6626
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the improvements @andymg3
if (image.acls().containsKey(record.id())) { | ||
changes.put(record.id(), Optional.empty()); | ||
} else if (changes.containsKey(record.id())) { | ||
changes.remove(record.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, the Jira mentions that we don't have "remove acl" to be idempotent. "Remove acl" is idempotent if they are process in the same batch/delta.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind clarifying? With the exception thrown below I think ACL removal is still not idempotent. If an ACL is added, then removed, then removed again, then the exception below will be thrown as it would have been removed from the Map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. I missed the else if
.
} else if (changes.containsKey(record.id())) { | ||
changes.remove(record.id()); | ||
} else { | ||
throw new RuntimeException("Failed to find existing ACL with ID " + record.id() + " in either image or changes"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we throw an IllegalStateException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that exception is more appropriate. I went with RuntimeException
because thats what we throw in AclControlManager
(https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L202) when we come across a removal unexpectedly. My thought process was to be consistent. Having said that, they are two different components so I'm fine updating it here to IllegalStateException
. Do you think we should throw the same exception in AclControlManager
?
@@ -61,7 +61,13 @@ public void replay(AccessControlEntryRecord record) { | |||
} | |||
|
|||
public void replay(RemoveAccessControlEntryRecord record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we document this method? Seems subtle enough to warrant documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add a java doc to this method.
changes.remove(record.id()); | ||
} else { | ||
throw new RuntimeException("Failed to find existing ACL with ID " + record.id() + " in either image or changes"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is for:
public Map<Uuid, Optional<StandardAcl>> changes() {
return changes;
}
Can we document this method? I am particularly interested on the return type. I get the impression that a value of Optional.empty
should interpreted as a delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct and that's the existing behavior. I'll add a comment clarifying that.
import java.util.Optional; | ||
|
||
@Timeout(40) | ||
public class AclsDeltaTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am okay moving these test to AclImageTest
. The two types are closely related. We already have "delta" tests in the "image" suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yeah that's true. I guess I don't see a big downside of having a separate test class as obviously it is different to some degree. So will keep the separate test class unless you have concerns.
|
||
delta.replay(inputAclRecord); | ||
assertTrue(delta.changes().containsKey(aclId)); | ||
assertTrue(delta.changes().get(aclId).isPresent()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can remove this since you are implicitly testing this in the line below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True but delta.changes().get(aclId)
could return null
in which case we'd throw a NPE which would be a bit more challenging to debug from the test logs in my view so will leave as is unless you have concerns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree but now that you have assertEquals(Optional.of(testStandardAcl()), delta.changes().get(aclId));
there shouldn't be a NPE, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think I can get rid of those two above assertions now. I just tested this with an invalid input to the Map and saw:
AclsDeltaTest > testRemovesDeleteIfNotInImage() FAILED
org.opentest4j.AssertionFailedError: expected: <Optional[StandardAcl(resourceType=ANY, resourceName=foo, patternType=ANY, principal=User:user, host=host, operation=ANY, permissionType=ANY)]> but was: <null>
So we should be safe from seeing a NPE. Will update.
delta.replay(inputAclRecord); | ||
assertTrue(delta.changes().containsKey(aclId)); | ||
assertTrue(delta.changes().get(aclId).isPresent()); | ||
assertEquals(testStandardAcl(), delta.changes().get(aclId).get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove the get()
and compare it against Optional.of(testStandardAcl())
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that's cleaner and it prevents NPEs if delta.changes().get(aclId)
returns null
} | ||
|
||
@Test | ||
public void testThrowsExceptionOnInvalidState() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also test with an AclsImage
that has ACLs but for the one being removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Just one minor comment.
if (image.acls().containsKey(record.id())) { | ||
changes.put(record.id(), Optional.empty()); | ||
} else if (changes.containsKey(record.id())) { | ||
changes.remove(record.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. I missed the else if
.
|
||
delta.replay(inputAclRecord); | ||
assertTrue(delta.changes().containsKey(aclId)); | ||
assertTrue(delta.changes().get(aclId).isPresent()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree but now that you have assertEquals(Optional.of(testStandardAcl()), delta.changes().get(aclId));
there shouldn't be a NPE, right?
…kly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. The test failures seem unrelated:
Build / JDK 8 and Scala 2.12 / testSeekAndCommitWithBrokerFailures() – kafka.api.ConsumerBounceTest3s
Build / ARM / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest
JIRA
https://issues.apache.org/jira/browse/KAFKA-13889
Description
AclsDelta
to handleACCESS_CONTROL_ENTRY_RECORD
quickly followed byREMOVE_ACCESS_CONTROL_ENTRY_RECORD
for same ACLchanges
Map. This could override a creation that might have just happened. This is an issue because inBrokerMetadataPublisher
this results in us making aremoveAcl
call which finally results in https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L203 being executed and this code throws an exception if the ACL isnt in the Map yet. If theACCESS_CONTROL_ENTRY_RECORD
event never got processed byBrokerMetadataPublisher
then the ACL wont be in the Map yet.changes
Map if the ACL doesnt exist in the image yet.Testing
Committer Checklist (excluded from commit message)