From 7bc0720a09de4afcb51db4c03630337f80aefd4e Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Tue, 8 Jun 2021 12:53:08 -0400 Subject: [PATCH] KAFKA-12916: Add new AUTO_CREATE ACL for auto topic creation This will allow authorizing a user to auto create a topic with cluster defaults but prevent manual creation with overriden settings --- .../java/org/apache/kafka/common/acl/AclOperation.java | 7 ++++++- .../java/org/apache/kafka/common/acl/AclOperationTest.java | 3 ++- .../scala/kafka/security/authorizer/AclAuthorizer.scala | 1 + core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++-- .../unit/kafka/security/authorizer/AclAuthorizerTest.scala | 6 ++++-- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 2 +- 6 files changed, 16 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java index 671069775ca0..44f81059af26 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java @@ -106,7 +106,12 @@ public enum AclOperation { /** * IDEMPOTENT_WRITE operation. */ - IDEMPOTENT_WRITE((byte) 12); + IDEMPOTENT_WRITE((byte) 12), + + /** + * AUTO_CREATE operation. + */ + AUTO_CREATE((byte) 13); // Note: we cannot have more than 30 ACL operations without modifying the format used // to describe ACL operations in MetadataResponse. diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java index c807e2be95a7..f1021598cff8 100644 --- a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java @@ -48,7 +48,8 @@ private static class AclOperationTestInfo { new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false), new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false), new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false), - new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false) + new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false), + new AclOperationTestInfo(AclOperation.AUTO_CREATE, 13, "auto_create", false) }; @Test diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index 5f701d8d4098..770ffd5da94a 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -477,6 +477,7 @@ class AclAuthorizer extends Authorizer with Logging { val allowOps = operation match { case DESCRIBE => Set[AclOperation](DESCRIBE, READ, WRITE, DELETE, ALTER) case DESCRIBE_CONFIGS => Set[AclOperation](DESCRIBE_CONFIGS, ALTER_CONFIGS) + case AUTO_CREATE => Set[AclOperation](AUTO_CREATE, CREATE) case _ => Set[AclOperation](operation) } allowOps.exists(operation => matchingAclExists(operation, resource, principal, host, ALLOW, acls)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bafc0411253b..40715bc9c270 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1155,8 +1155,8 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { - if (!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) { - val authorizedForCreateTopics = authHelper.filterByAuthorized(request.context, CREATE, TOPIC, + if (!authHelper.authorize(request.context, AUTO_CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) { + val authorizedForCreateTopics = authHelper.filterByAuthorized(request.context, AUTO_CREATE, TOPIC, nonExistingTopics)(identity) unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics) authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics) diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala index fa201db461d6..21c62aad5fa5 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala @@ -484,9 +484,9 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest { @Test def testAclInheritance(): Unit = { testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, - CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE)) + CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, AUTO_CREATE)) testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, - CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE)) + CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, AUTO_CREATE)) testImplicationsOfAllow(READ, Set(DESCRIBE)) testImplicationsOfAllow(WRITE, Set(DESCRIBE)) testImplicationsOfAllow(DELETE, Set(DESCRIBE)) @@ -494,6 +494,8 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest { testImplicationsOfDeny(DESCRIBE, Set()) testImplicationsOfAllow(ALTER_CONFIGS, Set(DESCRIBE_CONFIGS)) testImplicationsOfDeny(DESCRIBE_CONFIGS, Set()) + testImplicationsOfAllow(CREATE, Set(AUTO_CREATE)) + testImplicationsOfAllow(AUTO_CREATE, Set()) } private def testImplicationsOfAllow(parentOp: AclOperation, allowedOps: Set[AclOperation]): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 9a0bdb04211c..586a6976492f 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -966,7 +966,7 @@ class KafkaApisTest { topicName, AuthorizationResult.ALLOWED) if (enableAutoTopicCreation) - authorizeResource(authorizer, AclOperation.CREATE, ResourceType.CLUSTER, + authorizeResource(authorizer, AclOperation.AUTO_CREATE, ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED, logIfDenied = false) val topicConfigOverride = mutable.Map.empty[String, String]