Skip to content

Commit

Permalink
KAFKA-12916: Add new AUTO_CREATE ACL for auto topic creation
Browse files Browse the repository at this point in the history
This will allow authorizing a user to auto create a topic with cluster
defaults but prevent manual creation with overriden settings
  • Loading branch information
cshannon committed Jun 8, 2021
1 parent 1dadb6d commit 7bc0720
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 7 deletions.
Expand Up @@ -106,7 +106,12 @@ public enum AclOperation {
/**
* IDEMPOTENT_WRITE operation.
*/
IDEMPOTENT_WRITE((byte) 12);
IDEMPOTENT_WRITE((byte) 12),

/**
* AUTO_CREATE operation.
*/
AUTO_CREATE((byte) 13);

// Note: we cannot have more than 30 ACL operations without modifying the format used
// to describe ACL operations in MetadataResponse.
Expand Down
Expand Up @@ -48,7 +48,8 @@ private static class AclOperationTestInfo {
new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false),
new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false),
new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false),
new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false)
new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false),
new AclOperationTestInfo(AclOperation.AUTO_CREATE, 13, "auto_create", false)
};

@Test
Expand Down
Expand Up @@ -477,6 +477,7 @@ class AclAuthorizer extends Authorizer with Logging {
val allowOps = operation match {
case DESCRIBE => Set[AclOperation](DESCRIBE, READ, WRITE, DELETE, ALTER)
case DESCRIBE_CONFIGS => Set[AclOperation](DESCRIBE_CONFIGS, ALTER_CONFIGS)
case AUTO_CREATE => Set[AclOperation](AUTO_CREATE, CREATE)
case _ => Set[AclOperation](operation)
}
allowOps.exists(operation => matchingAclExists(operation, resource, principal, host, ALLOW, acls))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Expand Up @@ -1155,8 +1155,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedTopics.nonEmpty) {
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
val authorizedForCreateTopics = authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
if (!authHelper.authorize(request.context, AUTO_CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
val authorizedForCreateTopics = authHelper.filterByAuthorized(request.context, AUTO_CREATE, TOPIC,
nonExistingTopics)(identity)
unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics)
authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics)
Expand Down
Expand Up @@ -484,16 +484,18 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
@Test
def testAclInheritance(): Unit = {
testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE))
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, AUTO_CREATE))
testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE))
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, AUTO_CREATE))
testImplicationsOfAllow(READ, Set(DESCRIBE))
testImplicationsOfAllow(WRITE, Set(DESCRIBE))
testImplicationsOfAllow(DELETE, Set(DESCRIBE))
testImplicationsOfAllow(ALTER, Set(DESCRIBE))
testImplicationsOfDeny(DESCRIBE, Set())
testImplicationsOfAllow(ALTER_CONFIGS, Set(DESCRIBE_CONFIGS))
testImplicationsOfDeny(DESCRIBE_CONFIGS, Set())
testImplicationsOfAllow(CREATE, Set(AUTO_CREATE))
testImplicationsOfAllow(AUTO_CREATE, Set())
}

private def testImplicationsOfAllow(parentOp: AclOperation, allowedOps: Set[AclOperation]): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Expand Up @@ -966,7 +966,7 @@ class KafkaApisTest {
topicName, AuthorizationResult.ALLOWED)

if (enableAutoTopicCreation)
authorizeResource(authorizer, AclOperation.CREATE, ResourceType.CLUSTER,
authorizeResource(authorizer, AclOperation.AUTO_CREATE, ResourceType.CLUSTER,
Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED, logIfDenied = false)

val topicConfigOverride = mutable.Map.empty[String, String]
Expand Down

0 comments on commit 7bc0720

Please sign in to comment.