Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8967 Flaky test kafka.api.SaslSslAdminIntegrationTest.testCreat… #8137

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -291,6 +291,9 @@ default CreateAclsResult createAcls(Collection<AclBinding> acls) {
* If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
* no changes will be made.
* <p>
* Note that ACLs are stored in ZooKeeper and they are propagated to the brokers asynchronously so there may be a
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add document to highlight the behavior of syncing acls to all brokers.

* delay before the change takes effect even after the command returns.
* <p>
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param acls The ACLs to create
Expand Down
Expand Up @@ -119,24 +119,24 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava)
assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala)
results.all.get()
waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl))
waitForDescribeAcls(acl2.toFilter, Set(acl2))
waitForDescribeAcls(transactionalIdAcl.toFilter, Set(transactionalIdAcl))

val filterA = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, null, PatternType.LITERAL), AccessControlEntryFilter.ANY)
val filterB = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", PatternType.LITERAL), AccessControlEntryFilter.ANY)
val filterC = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TRANSACTIONAL_ID, null, PatternType.LITERAL), AccessControlEntryFilter.ANY)

waitForDescribeAcls(client, filterA, Set(groupAcl))
waitForDescribeAcls(client, filterC, Set(transactionalIdAcl))
waitForDescribeAcls(filterA, Set(groupAcl))
waitForDescribeAcls(filterC, Set(transactionalIdAcl))

val results2 = client.deleteAcls(List(filterA, filterB, filterC).asJava, new DeleteAclsOptions())
assertEquals(Set(filterA, filterB, filterC), results2.values.keySet.asScala)
assertEquals(Set(groupAcl), results2.values.get(filterA).get.values.asScala.map(_.binding).toSet)
assertEquals(Set(transactionalIdAcl), results2.values.get(filterC).get.values.asScala.map(_.binding).toSet)
assertEquals(Set(acl2), results2.values.get(filterB).get.values.asScala.map(_.binding).toSet)

waitForDescribeAcls(client, filterB, Set())
waitForDescribeAcls(client, filterC, Set())
waitForDescribeAcls(filterB, Set())
waitForDescribeAcls(filterC, Set())
}

@Test
Expand Down Expand Up @@ -297,8 +297,8 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
}, "timed out waiting for createAcls to " + (if (expectAuth) "succeed" else "fail"))
if (expectAuth) {
waitForDescribeAcls(client, fooAcl.toFilter, Set(fooAcl))
waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl))
waitForDescribeAcls(fooAcl.toFilter, Set(fooAcl))
waitForDescribeAcls(transactionalIdAcl.toFilter, Set(transactionalIdAcl))
}
TestUtils.waitUntilTrue(() => {
val result = client.deleteAcls(List(fooAcl.toFilter, transactionalIdAcl.toFilter).asJava, new DeleteAclsOptions)
Expand All @@ -324,8 +324,8 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
}, "timed out waiting for deleteAcls to " + (if (expectAuth) "succeed" else "fail"))
if (expectAuth) {
waitForDescribeAcls(client, fooAcl.toFilter, Set.empty)
waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set.empty)
waitForDescribeAcls(fooAcl.toFilter, Set.empty)
waitForDescribeAcls(transactionalIdAcl.toFilter, Set.empty)
}
}

Expand Down Expand Up @@ -393,6 +393,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu

client = Admin.create(createConfig())
client.createAcls(List(denyAcl).asJava, new CreateAclsOptions()).all().get()
waitForDescribeAcls(denyAcl.toFilter, Set(denyAcl))

val topics = Seq(topic1, topic2)
val configsOverride = Map(LogConfig.SegmentBytesProp -> "100000").asJava
Expand Down Expand Up @@ -456,18 +457,20 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
configEntries
}

private def waitForDescribeAcls(client: Admin, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
private def waitForDescribeAcls(filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
var lastResults: util.Collection[AclBinding] = null
TestUtils.waitUntilTrue(() => {
lastResults = client.describeAcls(filter).values.get()
acls == lastResults.asScala.toSet
acls == lastResults.asScala.toSet &&
// make sure all brokers have received the ACLs update from zookeeper notification
servers.forall(_.authorizer.forall(_.acls(filter).asScala.toSet == acls))
}, s"timed out waiting for ACLs $acls.\nActual $lastResults")
}

private def ensureAcls(bindings: Set[AclBinding]): Unit = {
client.createAcls(bindings.asJava).all().get()

bindings.foreach(binding => waitForDescribeAcls(client, binding.toFilter, Set(binding)))
bindings.foreach(binding => waitForDescribeAcls(binding.toFilter, Set(binding)))
}

private def getAcls(allTopicAcls: AclBindingFilter) = {
Expand Down