From d70378d1cf144f8e12bed7c50beaa9497e0fd36e Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Mon, 5 Jun 2017 18:24:51 -0700 Subject: [PATCH 1/9] KAFKA-5292. Fix authorization checks in AdminClient --- .../apache/kafka/clients/NetworkClient.java | 16 +- .../apache/kafka/common/acl/AclOperation.java | 18 ++ .../common/requests/AbstractResponse.java | 3 + .../kafka/common/resource/Resource.java | 10 + .../kafka/common/resource/ResourceType.java | 7 +- .../main/scala/kafka/admin/AclCommand.scala | 2 +- .../controller/ControllerChannelManager.scala | 2 +- .../scala/kafka/security/auth/Operation.scala | 8 +- .../kafka/security/auth/PermissionType.scala | 8 +- .../kafka/security/auth/ResourceType.scala | 28 ++- .../security/auth/SimpleAclAuthorizer.scala | 30 +-- .../main/scala/kafka/server/KafkaApis.scala | 49 +++-- .../api/AdminClientIntegrationTest.scala | 2 +- .../SaslSslAdminClientIntegrationTest.scala | 179 ++++++++++++++++-- .../kafka/security/auth/OperationTest.scala | 31 +-- .../security/auth/PermissionTypeTest.scala | 21 +- .../security/auth/ResourceTypeTest.scala | 19 ++ 17 files changed, 350 insertions(+), 83 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 1d4fe58949a17..34b9695c6d678 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -505,15 +505,18 @@ public Node leastLoadedNode(long now) { } public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { - return parseResponseMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0); + return parseResponseMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0, null); } private static AbstractResponse parseResponseMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, - Sensor throttleTimeSensor, long now) { + Sensor throttleTimeSensor, long now, StringBuffer structStringBuffer) { ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer); // Always expect the response version id to be the same as the request version id ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer); + if (structStringBuffer != null) { + structStringBuffer.append(responseBody.toString()); + } correlate(requestHeader, responseHeader); if (throttleTimeSensor != null && responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME)) throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME), now); @@ -604,8 +607,13 @@ private void handleCompletedReceives(List responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); - AbstractResponse body = parseResponseMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); - log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body); + StringBuffer structStringBuffer = log.isTraceEnabled() ? new StringBuffer() : null; + AbstractResponse body = parseResponseMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, + throttleTimeSensor, now, structStringBuffer); + if (log.isTraceEnabled()) { + log.trace("Completed receive from node {}, for key {}, received {}", req.destination, + req.header.apiKey(), structStringBuffer.toString()); + } if (req.isInternalRequest && body instanceof MetadataResponse) metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java index c63320d9cc4f6..7248c51f78381 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java @@ -22,6 +22,24 @@ /** * Represents an operation which an ACL grants or denies permission to perform. + * + * Some operations imply other operations. + * + * ALLOW ALL implies ALLOW everything + * DENY ALL implies DENY everything + * + * ALLOW READ implies ALLOW DESCRIBE + * ALLOW WRITE implies ALLOW DESCRIBE + * ALLOW DELETE implies ALLOW DESCRIBE + * + * ALLOW ALTER implies ALLOW DESCRIBE + * DENY DESCRIBE implies DENY ALTER + * + * ALLOW ALTER_CONFIGS implies ALLOW DESCRIBE_CONFIGS + * DENY DESCRIBE_CONFIGS implies DENY ALTER_CONFIGS + * + * ALLOW ALTER_CONFIGS on kafka-cluster implies ALLOW ALTER_CONFIGS on all topics. + * DENY DESCRIBE_CONFIGS on kafka-cluster implies DENY DESCRIBE_CONFIGS on all topics. */ public enum AclOperation { /** diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 99b35e8d0c4c1..1686976c4da86 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -125,4 +125,7 @@ public static AbstractResponse getResponse(ApiKeys apiKey, Struct struct) { } } + public String toString(short version) { + return toStruct(version).toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java index 2883a036014c4..93a0cd12c5138 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java @@ -26,6 +26,16 @@ public class Resource { private final ResourceType resourceType; private final String name; + /** + * The name of the CLUSTER resource. + */ + public final static String CLUSTER_NAME = "kafka-cluster"; + + /** + * A resource representing the whole cluster. + */ + public final static Resource CLUSTER = new Resource(ResourceType.CLUSTER, CLUSTER_NAME); + public Resource(ResourceType resourceType, String name) { Objects.requireNonNull(resourceType); this.resourceType = resourceType; diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java index a1b7b2b26bc44..ca3ddaf32839b 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java @@ -53,7 +53,12 @@ public enum ResourceType { /** * A broker. */ - BROKER((byte) 5); + BROKER((byte) 5), + + /** + * A transactional ID. + */ + TRANSACTIONAL_ID((byte) 6); private final static HashMap CODE_TO_VALUE = new HashMap<>(); diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 8cbd8a62b47e5..4522135b5957c 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -33,7 +33,7 @@ object AclCommand { val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] ( Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All), Group -> Set(Read, Describe, All), - Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, All), + Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All), TransactionalId -> Set(Describe, Write, All) ) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 8f98a8c26c246..013164223cbf1 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -244,7 +244,7 @@ class RequestSendThread(val controllerId: Int, val response = clientResponse.responseBody stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" - .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString)) + .format(controllerId, controllerContext.epoch, response.toString(), brokerNode.toString)) if (callback != null) { callback(response) diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala index d3a25b5a07295..a13345ab332a2 100644 --- a/core/src/main/scala/kafka/security/auth/Operation.scala +++ b/core/src/main/scala/kafka/security/auth/Operation.scala @@ -81,13 +81,7 @@ object Operation { op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(","))) } - def fromJava(operation: AclOperation): Try[Operation] = { - try { - Success(fromString(operation.toString)) - } catch { - case throwable: Throwable => Failure(throwable) - } - } + def fromJava(operation: AclOperation): Operation = fromString(operation.toString.replaceAll("_", "")) def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, DescribeConfigs, IdempotentWrite, All) diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala index ec99ae4cfdde3..686c60b60d2a6 100644 --- a/core/src/main/scala/kafka/security/auth/PermissionType.scala +++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala @@ -46,13 +46,7 @@ object PermissionType { pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(","))) } - def fromJava(permissionType: AclPermissionType): Try[PermissionType] = { - try { - Success(fromString(permissionType.toString)) - } catch { - case throwable: Throwable => Failure(throwable) - } - } + def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString) def values: Seq[PermissionType] = List(Allow, Deny) } diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala index 4deb23bc64ca3..2ab995bdd564b 100644 --- a/core/src/main/scala/kafka/security/auth/ResourceType.scala +++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala @@ -18,27 +18,41 @@ package kafka.security.auth import kafka.common.{BaseEnum, KafkaException} import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.resource.{ResourceType => JResourceType} -sealed trait ResourceType extends BaseEnum { def error: Errors } - -case object Cluster extends ResourceType { - val name = "Cluster" - val error = Errors.CLUSTER_AUTHORIZATION_FAILED +sealed trait ResourceType extends BaseEnum { + def error: Errors + def toJava: JResourceType } case object Topic extends ResourceType { val name = "Topic" val error = Errors.TOPIC_AUTHORIZATION_FAILED + val toJava = JResourceType.TOPIC } case object Group extends ResourceType { val name = "Group" val error = Errors.GROUP_AUTHORIZATION_FAILED + val toJava = JResourceType.GROUP +} + +case object Cluster extends ResourceType { + val name = "Cluster" + val error = Errors.CLUSTER_AUTHORIZATION_FAILED + val toJava = JResourceType.CLUSTER +} + +case object Broker extends ResourceType { + val name = "Broker" + val error = Errors.CLUSTER_AUTHORIZATION_FAILED // Same exception as Cluster, for now. + val toJava = JResourceType.BROKER } case object TransactionalId extends ResourceType { val name = "TransactionalId" val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED + val toJava = JResourceType.TRANSACTIONAL_ID } object ResourceType { @@ -48,5 +62,7 @@ object ResourceType { rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(","))) } - def values: Seq[ResourceType] = List(Cluster, Topic, Group, TransactionalId) + def values: Seq[ResourceType] = List(Topic, Group, Cluster, Broker, TransactionalId) + + def fromJava(operation: JResourceType): ResourceType = fromString(operation.toString.replaceAll("_", "")) } diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 0e78f524b05b4..58923d3289147 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -124,17 +124,25 @@ class SimpleAclAuthorizer extends Authorizer with Logging { val host = session.clientAddress.getHostAddress val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource)) - //check if there is any Deny acl match that would disallow this operation. - val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls) - - //if principal is allowed to read, write or delete we allow describe by default, the reverse does not apply to Deny. - val ops = if (Describe == operation) - Set[Operation](operation, Read, Write, Delete) - else - Set[Operation](operation) - - //now check if there is any allow acl that will allow this operation. - val allowMatch = ops.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls)) + // Check if there is any Deny acl match that would disallow this operation. + // If Describe is forbidden, Alter is forbidden as well. + // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance. + val denyOps = operation match { + case Alter => Set[Operation](Alter, Describe) + case AlterConfigs => Set[Operation](AlterConfigs, DescribeConfigs) + case _ => Set[Operation](operation) + } + val denyMatch = denyOps.exists(operation => aclMatch(operation, resource, principal, host, Deny, acls)) + + // Check if there are any Allow ACLs which would allow this operation. + // Allowing read, write, delete, or alter implies allowing describe. + // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance. + val allowOps = operation match { + case Describe => Set[Operation](Describe, Read, Write, Delete, Alter) + case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs) + case _ => Set[Operation](operation) + } + val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls)) //we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users //when no acls are found or if no deny acls are found and at least one allow acls matches. diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6cff0e6c8238c..0930a02cfcb2a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1747,7 +1747,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleDescribeAcls(request: RequestChannel.Request): Unit = { - authorizeClusterAction(request) + authorizeClusterDescribe(request) val describeAclsRequest = request.body[DescribeAclsRequest] authorizer match { case None => @@ -1786,15 +1786,13 @@ class KafkaApis(val requestChannel: RequestChannel, case AdminResourceType.ANY => return Failure(new InvalidRequestException("Invalid ANY resource type")) case _ => {} } - var resourceType: ResourceType = null - try { - resourceType = ResourceType.fromString(filter.resourceFilter().resourceType().toString) + val resourceType: ResourceType = try { + ResourceType.fromJava(filter.resourceFilter.resourceType) } catch { case throwable: Throwable => return Failure(new InvalidRequestException("Invalid resource type")) } - var principal: KafkaPrincipal = null - try { - principal = KafkaPrincipal.fromString(filter.entryFilter().principal()) + val principal: KafkaPrincipal = try { + KafkaPrincipal.fromString(filter.entryFilter.principal) } catch { case throwable: Throwable => return Failure(new InvalidRequestException("Invalid principal")) } @@ -1803,18 +1801,20 @@ class KafkaApis(val requestChannel: RequestChannel, case AclOperation.ANY => return Failure(new InvalidRequestException("Invalid ANY operation type")) case _ => {} } - val operation = Operation.fromJava(filter.entryFilter().operation()) match { - case Failure(throwable) => return Failure(new InvalidRequestException(throwable.getMessage)) - case Success(op) => op + val operation: Operation = try { + Operation.fromJava(filter.entryFilter.operation) + } catch { + case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage)) } filter.entryFilter().permissionType() match { case AclPermissionType.UNKNOWN => new InvalidRequestException("Invalid UNKNOWN permission type") case AclPermissionType.ANY => new InvalidRequestException("Invalid ANY permission type") case _ => {} } - val permissionType = PermissionType.fromJava(filter.entryFilter.permissionType) match { - case Failure(throwable) => return Failure(new InvalidRequestException(throwable.getMessage)) - case Success(perm) => perm + val permissionType: PermissionType = try { + PermissionType.fromJava(filter.entryFilter.permissionType) + } catch { + case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage)) } return Success((Resource(resourceType, filter.resourceFilter().name()), Acl(principal, permissionType, filter.entryFilter().host(), operation))) @@ -1837,7 +1837,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleCreateAcls(request: RequestChannel.Request): Unit = { - authorizeClusterAction(request) + authorizeClusterAlter(request) val createAclsRequest = request.body[CreateAclsRequest] authorizer match { case None => @@ -1846,7 +1846,6 @@ class KafkaApis(val requestChannel: RequestChannel, new SecurityDisabledException("No Authorizer is configured on the broker."))) case Some(auth) => val errors = mutable.HashMap[Int, Throwable]() - val creations = ListBuffer[(Resource, Acl)]() for (i <- 0 until createAclsRequest.aclCreations.size) { val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter) result match { @@ -1859,8 +1858,14 @@ class KafkaApis(val requestChannel: RequestChannel, if (resource.name.isEmpty) throw new InvalidRequestException("Invalid empty resource name") auth.addAcls(immutable.Set(acl), resource) + if (logger.isDebugEnabled) { + logger.debug("added acl " + acl + " to " + resource) + } } catch { - case throwable : Throwable => errors.put(i, throwable) + case throwable : Throwable => if (logger.isDebugEnabled) { + logger.debug("failed to add acl " + acl + " to " + resource, throwable) + } + errors.put(i, throwable) } } } @@ -1877,7 +1882,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleDeleteAcls(request: RequestChannel.Request): Unit = { - authorizeClusterAction(request) + authorizeClusterAlter(request) val deleteAclsRequest = request.body[DeleteAclsRequest] authorizer match { case None => @@ -2052,6 +2057,16 @@ class KafkaApis(val requestChannel: RequestChannel, throw new ClusterAuthorizationException(s"Request $request is not authorized.") } + def authorizeClusterAlter(request: RequestChannel.Request): Unit = { + if (!authorize(request.session, Alter, Resource.ClusterResource)) + throw new ClusterAuthorizationException(s"Request $request is not authorized.") + } + + def authorizeClusterDescribe(request: RequestChannel.Request): Unit = { + if (!authorize(request.session, Describe, Resource.ClusterResource)) + throw new ClusterAuthorizationException(s"Request $request is not authorized.") + } + private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) { sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs => sendResponse(request, createResponse(requestThrottleMs)) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 0a1d229563cbb..056a0257315db 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin._ import kafka.utils.{Logging, TestUtils, ZkUtils} import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.KafkaFuture -import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} +import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException} import org.apache.kafka.common.protocol.ApiKeys diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index 9cd86c3b29dec..50d8abd4da3fd 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -14,33 +14,69 @@ package kafka.api import java.io.File -import kafka.security.auth.SimpleAclAuthorizer +import kafka.admin.AclCommand +import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource} import org.apache.kafka.common.protocol.SecurityProtocol import kafka.server.KafkaConfig -import kafka.utils.{JaasTestUtils, TestUtils} -import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions} +import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils} +import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions, KafkaAdminClient} import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} -import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException} import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceType} +import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.Assert.assertEquals import org.junit.{After, Assert, Before, Test} import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup { this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName()) - this.serverConfig.setProperty(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) + override def configureSecurityBeforeServersStart() { + val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName()) + authorizer.configure(this.configs.head.originals()) + authorizer.addAcls(Set(new AuthAcl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"), Allow, + AuthAcl.WildCardHost, All)), new AuthResource(Topic, "*")) + authorizer.addAcls(Set(clusterAcl(Allow, Create), + clusterAcl(Allow, Delete), + clusterAcl(Allow, ClusterAction), + clusterAcl(Allow, AlterConfigs), + clusterAcl(Allow, Alter)), + AuthResource.ClusterResource) + } + @Before override def setUp(): Unit = { startSasl(jaasSections(Seq("GSSAPI"), Some("GSSAPI"), Both, JaasTestUtils.KafkaServerContextName)) super.setUp() } + private def clusterAcl(permissionType: PermissionType, operation: Operation): AuthAcl = { + new AuthAcl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"), permissionType, + AuthAcl.WildCardHost, operation) + } + + private def addClusterAcl(permissionType: PermissionType, operation: Operation): Unit = { + val acls = Set(clusterAcl(permissionType, operation)) + val authorizer = servers.head.apis.authorizer.get + val prevAcls = authorizer.getAcls(AuthResource.ClusterResource) + authorizer.addAcls(acls, AuthResource.ClusterResource) + TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, AuthResource.ClusterResource) + } + + private def removeClusterAcl(permissionType: PermissionType, operation: Operation): Unit = { + val acls = Set(clusterAcl(permissionType, operation)) + val authorizer = servers.head.apis.authorizer.get + val prevAcls = authorizer.getAcls(AuthResource.ClusterResource) + Assert.assertTrue(authorizer.removeAcls(acls, AuthResource.ClusterResource)) + TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, AuthResource.ClusterResource) + } + @After override def tearDown(): Unit = { super.tearDown() @@ -54,15 +90,10 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with val ACL_UNKNOWN = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW)); - /** - * Test that ACL operations are not possible when the authorizer is disabled. - * Also see {@link kafka.api.KafkaAdminClientSecureIntegrationTest} for tests of ACL operations - * when the authorizer is enabled. - */ @Test override def testAclOperations(): Unit = { client = AdminClient.create(createConfig()) - assertEquals(0, client.describeAcls(AclBindingFilter.ANY).all().get().size()) + assertEquals(6, client.describeAcls(AclBindingFilter.ANY).all().get().size()) val results = client.createAcls(List(ACL2, ACL3).asJava) assertEquals(Set(ACL2, ACL3), results.results().keySet().asScala) results.results().values().asScala.foreach(value => value.get) @@ -90,9 +121,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with val results = client.createAcls(List(ACL2, ACL2).asJava) assertEquals(Set(ACL2, ACL2), results.results().keySet().asScala) results.all().get() - waitForDescribeAcls(client, AclBindingFilter.ANY, Set(ACL2)) + waitForDescribeAcls(client, ACL2.toFilter, Set(ACL2)) - val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.CLUSTER, null), AccessControlEntryFilter.ANY) + val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null), AccessControlEntryFilter.ANY) val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"), AccessControlEntryFilter.ANY) waitForDescribeAcls(client, filterA, Set()) @@ -119,4 +150,126 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with assertFutureExceptionTypeEquals(results.results().get(clusterAcl), classOf[InvalidRequestException]) assertFutureExceptionTypeEquals(results.results().get(emptyResourceNameAcl), classOf[InvalidRequestException]) } + + val FOO_ACL = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) + val FOO_ACL_FILTER = FOO_ACL.toFilter + + private def verifyCauseIsClusterAuth(e: Exception): Unit = { + if (!e.getCause.isInstanceOf[ClusterAuthorizationException]) { + throw e.getCause + } + } + + private def testAclCreateGetDelete(expectAuth: Boolean): Unit = { + TestUtils.waitUntilTrue(() => { + val result = client.createAcls(List(FOO_ACL).asJava, new CreateAclsOptions()) + if (expectAuth) { + Try(result.all().get) match { + case Failure(e: Exception) => { + verifyCauseIsClusterAuth(e) + false + } + case Success(_) => true + } + } else { + Try(result.all().get) match { + case Failure(e: Exception) => { + verifyCauseIsClusterAuth(e) + true + } + case Success(_) => false + } + } + }, "timed out waiting for createAcls to " + (if (expectAuth) "succeed" else "fail")) + if (expectAuth) { + waitForDescribeAcls(client, FOO_ACL_FILTER, Set(FOO_ACL)) + } + TestUtils.waitUntilTrue(() => { + val result = client.deleteAcls(List(FOO_ACL.toFilter).asJava, new DeleteAclsOptions()) + if (expectAuth) { + Try(result.all().get) match { + case Failure(e: Exception) => { + verifyCauseIsClusterAuth(e) + false + } + case Success(_) => true + } + } else { + Try(result.all().get) match { + case Failure(e: Exception) => { + verifyCauseIsClusterAuth(e) + true + } + case Success(removed) => { + assertEquals(Set(FOO_ACL), result.results.get(FOO_ACL_FILTER).get.acls.asScala.map(result => result.acl()).toSet) + true + } + } + } + }, "timed out waiting for deleteAcls to " + (if (expectAuth) "succeed" else "fail")) + if (expectAuth) { + waitForDescribeAcls(client, FOO_ACL_FILTER, Set()) + } + } + + private def testAclGet(expectAuth: Boolean): Unit = { + TestUtils.waitUntilTrue(() => { + val userAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*"), + new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW)) + val results = client.describeAcls(userAcl.toFilter) + if (expectAuth) { + Try(results.all().get) match { + case Failure(e: Exception) => { + verifyCauseIsClusterAuth(e) + false + } + case Success(acls) => Set(userAcl).equals(acls.asScala.toSet) + } + } else { + Try(results.all().get) match { + case Failure(e: Exception) => { + verifyCauseIsClusterAuth(e) + true + } + case Success(_) => false + } + } + }, "timed out waiting for describeAcls to " + (if (expectAuth) "succeed" else "fail")) + } + + @Test + def testAclAuthorizationDenied(): Unit = { + client = AdminClient.create(createConfig()) + + // Test that we cannot create or delete ACLs when Alter is denied. + addClusterAcl(Deny, Alter) + testAclGet(true) + testAclCreateGetDelete(false) + + // Test that we cannot do anything with ACLs when Describe is denied. + removeClusterAcl(Deny, Alter) + addClusterAcl(Deny, Describe) + testAclGet(false) + testAclCreateGetDelete(false) + + // Test that we can create, delete, and get ACLs with the default ACLs. + removeClusterAcl(Deny, Describe) + testAclGet(true) + testAclCreateGetDelete(true) + + // Test that we can't do anything with ACLs without the Allow Alter ACL in place. + removeClusterAcl(Allow, Alter) + removeClusterAcl(Allow, Delete) + testAclGet(false) + testAclCreateGetDelete(false) + + // Test that we can describe, but not alter ACLs, with only the Allow Describe ACL in place. + addClusterAcl(Allow, Describe) + testAclGet(true) + testAclCreateGetDelete(false) + + removeClusterAcl(Allow, Describe) + addClusterAcl(Allow, Alter) + } } diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala index 1df34ea96a2ab..7b5588191eb98 100644 --- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala @@ -14,25 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package kafka.security.auth -import kafka.common.{KafkaException} -import org.junit.{Test, Assert} +import org.apache.kafka.common.acl.AclOperation +import org.junit.{Assert, Test} import org.scalatest.junit.JUnitSuite class OperationTest extends JUnitSuite { - + /** + * Test round trip conversions between org.apache.kafka.common.acl.AclOperation and + * kafka.security.auth.Operation. + */ @Test - def testFromString(): Unit = { - val op = Operation.fromString("READ") - Assert.assertEquals(Read, op) - - try { - Operation.fromString("badName") - fail("Expected exception on invalid operation name.") - } catch { - case _: KafkaException => // expected + def testJavaConversions(): Unit = { + AclOperation.values().foreach { + aclOp => aclOp match { + case AclOperation.UNKNOWN => {} + case AclOperation.ANY => {} + case default => { + val op = Operation.fromJava(aclOp) + val aclOp2 = op.toJava + Assert.assertEquals(aclOp, aclOp2) + } + } } } - } diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala index 1b1c8641c6663..fd78ac22eeb79 100644 --- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala @@ -17,7 +17,8 @@ package kafka.security.auth import kafka.common.KafkaException -import org.junit.{Test, Assert} +import org.apache.kafka.common.acl.AclPermissionType +import org.junit.{Assert, Test} import org.scalatest.junit.JUnitSuite class PermissionTypeTest extends JUnitSuite { @@ -35,4 +36,22 @@ class PermissionTypeTest extends JUnitSuite { } } + /** + * Test round trip conversions between org.apache.kafka.common.acl.AclPermissionType and + * kafka.security.auth.PermissionType. + */ + @Test + def testJavaConversions(): Unit = { + AclPermissionType.values().foreach { + aclPerm => aclPerm match { + case AclPermissionType.UNKNOWN => {} + case AclPermissionType.ANY => {} + case default => { + val perm = PermissionType.fromJava(aclPerm) + val aclPerm2 = perm.toJava + Assert.assertEquals(aclPerm, aclPerm2) + } + } + } + } } diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala index 546c92e71e2b8..cccd0b9eee648 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala @@ -19,6 +19,7 @@ package kafka.security.auth import kafka.common.KafkaException import org.junit.{Test, Assert} import org.scalatest.junit.JUnitSuite +import org.apache.kafka.common.resource.{ResourceType => JResourceType} class ResourceTypeTest extends JUnitSuite { @@ -35,4 +36,22 @@ class ResourceTypeTest extends JUnitSuite { } } + /** + * Test round trip conversions between org.apache.kafka.common.acl.ResourceType and + * kafka.security.auth.ResourceType. + */ + @Test + def testJavaConversions(): Unit = { + JResourceType.values().foreach { + jResourceType => jResourceType match { + case JResourceType.UNKNOWN => {} + case JResourceType.ANY => {} + case default => { + val resourceType = ResourceType.fromJava(jResourceType) + val jResourceType2 = resourceType.toJava + Assert.assertEquals(jResourceType, jResourceType2) + } + } + } + } } From 5eb84c2bfcb0207906624cee5946f5264079326a Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Mon, 5 Jun 2017 22:44:56 -0700 Subject: [PATCH 2/9] Add SimpleAclAuthorizerTest#testAclInheritance --- .../auth/SimpleAclAuthorizerTest.scala | 58 ++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 6017a7bf4cd5f..c7a3e8ce63a19 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -17,7 +17,7 @@ package kafka.security.auth import java.net.InetAddress -import java.util.{UUID} +import java.util.UUID import kafka.network.RequestChannel.Session import kafka.security.auth.Acl.WildCardHost @@ -351,6 +351,62 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer2, commonResource) } + /** + * Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation} + */ + @Test + def testAclInheritance(): Unit = { + testImplicationsOfAllow(All, Set(Read, Write, Create, Delete, Alter, Describe, + ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite)) + testImplicationsOfDeny(All, Set(Read, Write, Create, Delete, Alter, Describe, + ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite)) + testImplicationsOfAllow(Read, Set(Describe)) + testImplicationsOfAllow(Write, Set(Describe)) + testImplicationsOfAllow(Delete, Set(Describe)) + testImplicationsOfAllow(Alter, Set(Describe)) + testImplicationsOfDeny(Describe, Set(Alter)) + testImplicationsOfAllow(AlterConfigs, Set(DescribeConfigs)) + testImplicationsOfDeny(DescribeConfigs, Set(AlterConfigs)) + } + + private def testImplicationsOfAllow(parentOp: Operation, allowedOps: Set[Operation]): Unit = { + val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + val host1 = InetAddress.getByName("192.168.3.1") + val host1Session = new Session(user1, host1) + val acl = new Acl(user1, Allow, WildCardHost, parentOp) + simpleAclAuthorizer.addAcls(Set(acl), Resource.ClusterResource) + Operation.values.foreach { + case op => { + val authorized = simpleAclAuthorizer.authorize(host1Session , op, Resource.ClusterResource) + if (allowedOps.contains(op) || (op == parentOp)) { + assertTrue(s"ALLOW $parentOp should imply ALLOW $op", authorized) + } else { + assertFalse(s"ALLOW $parentOp should not imply ALLOW $op", authorized) + } + } + } + simpleAclAuthorizer.removeAcls(Set(acl), Resource.ClusterResource) + } + + private def testImplicationsOfDeny(parentOp: Operation, deniedOps: Set[Operation]): Unit = { + val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + val host1 = InetAddress.getByName("192.168.3.1") + val host1Session = new Session(user1, host1) + val acls = Set(new Acl(user1, Deny, WildCardHost, parentOp), new Acl(user1, Allow, WildCardHost, All)) + simpleAclAuthorizer.addAcls(acls, Resource.ClusterResource) + Operation.values.foreach { + case op => { + val authorized = simpleAclAuthorizer.authorize(host1Session , op, Resource.ClusterResource) + if (deniedOps.contains(op) || (op == parentOp)) { + assertFalse(s"DENY $parentOp should imply DENY $op", authorized) + } else { + assertTrue(s"DENY $parentOp should not imply DENY $op", authorized) + } + } + } + simpleAclAuthorizer.removeAcls(acls, Resource.ClusterResource) + } + @Test def testHighConcurrencyDeletionOfResourceAcls() { val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All) From bf5c3f3cf4c45791aae4c3968dde5e707a306808 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Tue, 6 Jun 2017 09:50:41 -0700 Subject: [PATCH 3/9] fixes --- .../kafka/common/resource/ResourceType.java | 7 +-- .../kafka/common/acl/AclOperationTest.java | 1 + .../common/acl/AclPermissionTypeTest.java | 1 + .../common/resource/ResourceTypeTest.java | 3 +- .../kafka/security/auth/ResourceType.scala | 8 +--- .../main/scala/kafka/server/KafkaApis.scala | 7 ++- .../api/AdminClientIntegrationTest.scala | 2 +- .../SaslSslAdminClientIntegrationTest.scala | 44 +++++++------------ .../kafka/security/auth/OperationTest.scala | 3 +- 9 files changed, 28 insertions(+), 48 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java index ca3ddaf32839b..f85e2c4267707 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java @@ -50,15 +50,10 @@ public enum ResourceType { */ CLUSTER((byte) 4), - /** - * A broker. - */ - BROKER((byte) 5), - /** * A transactional ID. */ - TRANSACTIONAL_ID((byte) 6); + TRANSACTIONAL_ID((byte) 5); private final static HashMap CODE_TO_VALUE = new HashMap<>(); diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java index 5f5a87cd8e5e0..ba09499a67826 100644 --- a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java @@ -61,6 +61,7 @@ public void testIsUnknown() throws Exception { @Test public void testCode() throws Exception { + assertEquals(AclOperation.values().length, INFOS.length); for (AclOperationTestInfo info : INFOS) { assertEquals(info.operation + " was supposed to have code == " + info.code, info.code, info.operation.code()); diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java index 8e7fdc70d0bd0..15b906866ef76 100644 --- a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java @@ -53,6 +53,7 @@ public void testIsUnknown() throws Exception { @Test public void testCode() throws Exception { + assertEquals(AclPermissionType.values().length, INFOS.length); for (AclPermissionTypeTestInfo info : INFOS) { assertEquals(info.ty + " was supposed to have code == " + info.code, info.code, info.ty.code()); diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java index 4dc4cac7b880c..5d99997ae0134 100644 --- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java @@ -41,7 +41,7 @@ private static class AclResourceTypeTestInfo { new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false), new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false), new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false), - new AclResourceTypeTestInfo(ResourceType.BROKER, 5, "broker", false) + new AclResourceTypeTestInfo(ResourceType.TRANSACTIONAL_ID, 5, "transactionalId", false) }; @Test @@ -54,6 +54,7 @@ public void testIsUnknown() throws Exception { @Test public void testCode() throws Exception { + assertEquals(ResourceType.values().length, INFOS.length); for (AclResourceTypeTestInfo info : INFOS) { assertEquals(info.resourceType + " was supposed to have code == " + info.code, info.code, info.resourceType.code()); diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala index 2ab995bdd564b..b046dddc00b9c 100644 --- a/core/src/main/scala/kafka/security/auth/ResourceType.scala +++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala @@ -43,12 +43,6 @@ case object Cluster extends ResourceType { val toJava = JResourceType.CLUSTER } -case object Broker extends ResourceType { - val name = "Broker" - val error = Errors.CLUSTER_AUTHORIZATION_FAILED // Same exception as Cluster, for now. - val toJava = JResourceType.BROKER -} - case object TransactionalId extends ResourceType { val name = "TransactionalId" val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED @@ -62,7 +56,7 @@ object ResourceType { rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(","))) } - def values: Seq[ResourceType] = List(Topic, Group, Cluster, Broker, TransactionalId) + def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId) def fromJava(operation: JResourceType): ResourceType = fromString(operation.toString.replaceAll("_", "")) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0930a02cfcb2a..2f981465d9f89 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1858,12 +1858,11 @@ class KafkaApis(val requestChannel: RequestChannel, if (resource.name.isEmpty) throw new InvalidRequestException("Invalid empty resource name") auth.addAcls(immutable.Set(acl), resource) - if (logger.isDebugEnabled) { - logger.debug("added acl " + acl + " to " + resource) - } + if (logger.isDebugEnabled) + logger.debug(s"Added acl $acl to $resource") } catch { case throwable : Throwable => if (logger.isDebugEnabled) { - logger.debug("failed to add acl " + acl + " to " + resource, throwable) + logger.debug(s"Failed to add acl $acl to $resource", throwable) } errors.put(i, throwable) } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 056a0257315db..0a1d229563cbb 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin._ import kafka.utils.{Logging, TestUtils, ZkUtils} import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.KafkaFuture -import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} +import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException} import org.apache.kafka.common.protocol.ApiKeys diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index 50d8abd4da3fd..87a8abfb27fc4 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -14,7 +14,6 @@ package kafka.api import java.io.File -import kafka.admin.AclCommand import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource} import org.apache.kafka.common.protocol.SecurityProtocol import kafka.server.KafkaConfig @@ -40,7 +39,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with override def configureSecurityBeforeServersStart() { val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName()) authorizer.configure(this.configs.head.originals()) - authorizer.addAcls(Set(new AuthAcl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"), Allow, + authorizer.addAcls(Set(new AuthAcl(AuthAcl.WildCardPrincipal, Allow, AuthAcl.WildCardHost, All)), new AuthResource(Topic, "*")) authorizer.addAcls(Set(clusterAcl(Allow, Create), clusterAcl(Allow, Delete), @@ -66,7 +65,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with val authorizer = servers.head.apis.authorizer.get val prevAcls = authorizer.getAcls(AuthResource.ClusterResource) authorizer.addAcls(acls, AuthResource.ClusterResource) - TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, AuthResource.ClusterResource) + TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, AuthResource.ClusterResource) } private def removeClusterAcl(permissionType: PermissionType, operation: Operation): Unit = { @@ -89,6 +88,8 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)); val ACL_UNKNOWN = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW)); + val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) @Test override def testAclOperations(): Unit = { @@ -151,11 +152,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with assertFutureExceptionTypeEquals(results.results().get(emptyResourceNameAcl), classOf[InvalidRequestException]) } - val FOO_ACL = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"), - new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) - val FOO_ACL_FILTER = FOO_ACL.toFilter - - private def verifyCauseIsClusterAuth(e: Exception): Unit = { + private def verifyCauseIsClusterAuth(e: Throwable): Unit = { if (!e.getCause.isInstanceOf[ClusterAuthorizationException]) { throw e.getCause } @@ -163,53 +160,48 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with private def testAclCreateGetDelete(expectAuth: Boolean): Unit = { TestUtils.waitUntilTrue(() => { - val result = client.createAcls(List(FOO_ACL).asJava, new CreateAclsOptions()) + val result = client.createAcls(List(fooAcl).asJava, new CreateAclsOptions()) if (expectAuth) { Try(result.all().get) match { - case Failure(e: Exception) => { + case Failure(e) => verifyCauseIsClusterAuth(e) false - } case Success(_) => true } } else { Try(result.all().get) match { - case Failure(e: Exception) => { + case Failure(e) => verifyCauseIsClusterAuth(e) true - } case Success(_) => false } } }, "timed out waiting for createAcls to " + (if (expectAuth) "succeed" else "fail")) if (expectAuth) { - waitForDescribeAcls(client, FOO_ACL_FILTER, Set(FOO_ACL)) + waitForDescribeAcls(client, fooAcl.toFilter, Set(fooAcl)) } TestUtils.waitUntilTrue(() => { - val result = client.deleteAcls(List(FOO_ACL.toFilter).asJava, new DeleteAclsOptions()) + val result = client.deleteAcls(List(fooAcl.toFilter).asJava, new DeleteAclsOptions()) if (expectAuth) { Try(result.all().get) match { - case Failure(e: Exception) => { + case Failure(e) => verifyCauseIsClusterAuth(e) false - } case Success(_) => true } } else { Try(result.all().get) match { - case Failure(e: Exception) => { + case Failure(e) => verifyCauseIsClusterAuth(e) true - } - case Success(removed) => { - assertEquals(Set(FOO_ACL), result.results.get(FOO_ACL_FILTER).get.acls.asScala.map(result => result.acl()).toSet) + case Success(removed) => + assertEquals(Set(fooAcl), result.results.get(fooAcl.toFilter).get.acls.asScala.map(result => result.acl()).toSet) true - } } } }, "timed out waiting for deleteAcls to " + (if (expectAuth) "succeed" else "fail")) if (expectAuth) { - waitForDescribeAcls(client, FOO_ACL_FILTER, Set()) + waitForDescribeAcls(client, fooAcl.toFilter, Set()) } } @@ -220,18 +212,16 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with val results = client.describeAcls(userAcl.toFilter) if (expectAuth) { Try(results.all().get) match { - case Failure(e: Exception) => { + case Failure(e) => verifyCauseIsClusterAuth(e) false - } case Success(acls) => Set(userAcl).equals(acls.asScala.toSet) } } else { Try(results.all().get) match { - case Failure(e: Exception) => { + case Failure(e) => verifyCauseIsClusterAuth(e) true - } case Success(_) => false } } diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala index 7b5588191eb98..bbbc11ed82527 100644 --- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala @@ -30,8 +30,7 @@ class OperationTest extends JUnitSuite { def testJavaConversions(): Unit = { AclOperation.values().foreach { aclOp => aclOp match { - case AclOperation.UNKNOWN => {} - case AclOperation.ANY => {} + case AclOperation.UNKNOWN | AclOperation.ANY => case default => { val op = Operation.fromJava(aclOp) val aclOp2 = op.toJava From 78350ddaf4d9360cade0a28baa07736708fda673 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Tue, 6 Jun 2017 14:45:37 -0700 Subject: [PATCH 4/9] Fix ResourceTypeTest --- .../java/org/apache/kafka/common/resource/ResourceTypeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java index 5d99997ae0134..9adade1ec4aee 100644 --- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java @@ -41,7 +41,7 @@ private static class AclResourceTypeTestInfo { new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false), new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false), new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false), - new AclResourceTypeTestInfo(ResourceType.TRANSACTIONAL_ID, 5, "transactionalId", false) + new AclResourceTypeTestInfo(ResourceType.TRANSACTIONAL_ID, 5, "transactional_id", false) }; @Test From 46762741c282399cb22238c3ccf85a789e3c6230 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Tue, 6 Jun 2017 18:04:22 -0700 Subject: [PATCH 5/9] KafkaApis: do not check Resource.ClusterResource when authorizing DescribeConfigs / AlterConfigs --- core/src/main/scala/kafka/server/KafkaApis.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2f981465d9f89..5fb13a6dea123 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2001,8 +2001,7 @@ class KafkaApis(val requestChannel: RequestChannel, case RResourceType.BROKER => authorize(request.session, AlterConfigs, Resource.ClusterResource) case RResourceType.TOPIC => - authorize(request.session, AlterConfigs, new Resource(Topic, resource.name)) || - authorize(request.session, AlterConfigs, Resource.ClusterResource) + authorize(request.session, AlterConfigs, new Resource(Topic, resource.name)) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") } } @@ -2034,8 +2033,7 @@ class KafkaApis(val requestChannel: RequestChannel, resource.`type` match { case RResourceType.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource) case RResourceType.TOPIC => - authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name)) || - authorize(request.session, DescribeConfigs, Resource.ClusterResource) + authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name)) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}") } } From 752628cf58b169a920e7eb1f6329f9b22652e2ef Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Tue, 6 Jun 2017 18:19:56 -0700 Subject: [PATCH 6/9] minor improvements to KAFKA-5292 --- .../apache/kafka/clients/NetworkClient.java | 25 +++++++++++-------- .../controller/ControllerChannelManager.scala | 5 ++-- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 34b9695c6d678..de16d7d43b8a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -505,22 +505,25 @@ public Node leastLoadedNode(long now) { } public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { - return parseResponseMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0, null); + return createResponse(parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, + null, 0), requestHeader); } - private static AbstractResponse parseResponseMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, - Sensor throttleTimeSensor, long now, StringBuffer structStringBuffer) { + private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, + Sensor throttleTimeSensor, long now) { ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer); // Always expect the response version id to be the same as the request version id ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer); - if (structStringBuffer != null) { - structStringBuffer.append(responseBody.toString()); - } correlate(requestHeader, responseHeader); if (throttleTimeSensor != null && responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME)) throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME), now); - return AbstractResponse.getResponse(apiKey, responseBody); + return responseBody; + } + + private static AbstractResponse createResponse(Struct responseStruct, RequestHeader requestHeader) { + ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); + return AbstractResponse.getResponse(apiKey, responseStruct); } /** @@ -607,13 +610,13 @@ private void handleCompletedReceives(List responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); - StringBuffer structStringBuffer = log.isTraceEnabled() ? new StringBuffer() : null; - AbstractResponse body = parseResponseMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, - throttleTimeSensor, now, structStringBuffer); + Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, + throttleTimeSensor, now); if (log.isTraceEnabled()) { log.trace("Completed receive from node {}, for key {}, received {}", req.destination, - req.header.apiKey(), structStringBuffer.toString()); + req.header.apiKey(), responseStruct.toString()); } + AbstractResponse body = createResponse(responseStruct, req.header); if (req.isInternalRequest && body instanceof MetadataResponse) metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 013164223cbf1..e5d12e8823938 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -237,14 +237,15 @@ class RequestSendThread(val controllerId: Int, } } if (clientResponse != null) { - val api = ApiKeys.forId(clientResponse.requestHeader.apiKey) + val requestHeader = clientResponse.requestHeader + val api = ApiKeys.forId(requestHeader.apiKey) if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA_KEY) throw new KafkaException(s"Unexpected apiKey received: $apiKey") val response = clientResponse.responseBody stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" - .format(controllerId, controllerContext.epoch, response.toString(), brokerNode.toString)) + .format(controllerId, controllerContext.epoch, response.toString(requestHeader.apiVersion), brokerNode.toString)) if (callback != null) { callback(response) From 3dd6ba8035b91e1689f808000ad6647255eb7ad0 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Tue, 6 Jun 2017 20:31:20 -0700 Subject: [PATCH 7/9] remove DENY implications --- .../java/org/apache/kafka/common/acl/AclOperation.java | 3 --- .../scala/kafka/security/auth/SimpleAclAuthorizer.scala | 9 +-------- .../kafka/security/auth/SimpleAclAuthorizerTest.scala | 4 ++-- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java index 7248c51f78381..a939b868b4a57 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java @@ -33,13 +33,10 @@ * ALLOW DELETE implies ALLOW DESCRIBE * * ALLOW ALTER implies ALLOW DESCRIBE - * DENY DESCRIBE implies DENY ALTER * * ALLOW ALTER_CONFIGS implies ALLOW DESCRIBE_CONFIGS - * DENY DESCRIBE_CONFIGS implies DENY ALTER_CONFIGS * * ALLOW ALTER_CONFIGS on kafka-cluster implies ALLOW ALTER_CONFIGS on all topics. - * DENY DESCRIBE_CONFIGS on kafka-cluster implies DENY DESCRIBE_CONFIGS on all topics. */ public enum AclOperation { /** diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 58923d3289147..03eb9e30fbe4a 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -125,14 +125,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource)) // Check if there is any Deny acl match that would disallow this operation. - // If Describe is forbidden, Alter is forbidden as well. - // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance. - val denyOps = operation match { - case Alter => Set[Operation](Alter, Describe) - case AlterConfigs => Set[Operation](AlterConfigs, DescribeConfigs) - case _ => Set[Operation](operation) - } - val denyMatch = denyOps.exists(operation => aclMatch(operation, resource, principal, host, Deny, acls)) + val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls) // Check if there are any Allow ACLs which would allow this operation. // Allowing read, write, delete, or alter implies allowing describe. diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index c7a3e8ce63a19..4ed4b10a65ffd 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -364,9 +364,9 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { testImplicationsOfAllow(Write, Set(Describe)) testImplicationsOfAllow(Delete, Set(Describe)) testImplicationsOfAllow(Alter, Set(Describe)) - testImplicationsOfDeny(Describe, Set(Alter)) + testImplicationsOfDeny(Describe, Set()) testImplicationsOfAllow(AlterConfigs, Set(DescribeConfigs)) - testImplicationsOfDeny(DescribeConfigs, Set(AlterConfigs)) + testImplicationsOfDeny(DescribeConfigs, Set()) } private def testImplicationsOfAllow(parentOp: Operation, allowedOps: Set[Operation]): Unit = { From 35f7196ada1725e1f36e03da0d95db71ef764dad Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Wed, 7 Jun 2017 09:10:13 -0700 Subject: [PATCH 8/9] ACL2 -> acl2, ACL3 -> acl3, etc. --- .../SaslSslAdminClientIntegrationTest.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index 87a8abfb27fc4..d471593a80609 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -82,12 +82,10 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with closeSasl() } - val ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2"), + val acl2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW)); - val ACL3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), + val acl3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)); - val ACL_UNKNOWN = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), - new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW)); val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) @@ -95,17 +93,19 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with override def testAclOperations(): Unit = { client = AdminClient.create(createConfig()) assertEquals(6, client.describeAcls(AclBindingFilter.ANY).all().get().size()) - val results = client.createAcls(List(ACL2, ACL3).asJava) - assertEquals(Set(ACL2, ACL3), results.results().keySet().asScala) + val results = client.createAcls(List(acl2, acl3).asJava) + assertEquals(Set(acl2, acl3), results.results().keySet().asScala) results.results().values().asScala.foreach(value => value.get) - val results2 = client.createAcls(List(ACL_UNKNOWN).asJava) - assertEquals(Set(ACL_UNKNOWN), results2.results().keySet().asScala) + val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW)); + val results2 = client.createAcls(List(aclUnknown).asJava) + assertEquals(Set(aclUnknown), results2.results().keySet().asScala) assertFutureExceptionTypeEquals(results2.all(), classOf[InvalidRequestException]) - val results3 = client.deleteAcls(List(ACL1.toFilter, ACL2.toFilter, ACL3.toFilter).asJava) - assertEquals(Set(ACL1.toFilter, ACL2.toFilter, ACL3.toFilter), results3.results().keySet().asScala) + val results3 = client.deleteAcls(List(ACL1.toFilter, acl2.toFilter, acl3.toFilter).asJava) + assertEquals(Set(ACL1.toFilter, acl2.toFilter, acl3.toFilter), results3.results().keySet().asScala) assertEquals(0, results3.results.get(ACL1.toFilter).get.acls.size()) - assertEquals(Set(ACL2), results3.results.get(ACL2.toFilter).get.acls.asScala.map(result => result.acl()).toSet) - assertEquals(Set(ACL3), results3.results.get(ACL3.toFilter).get.acls.asScala.map(result => result.acl()).toSet) + assertEquals(Set(acl2), results3.results.get(acl2.toFilter).get.acls.asScala.map(result => result.acl()).toSet) + assertEquals(Set(acl3), results3.results.get(acl3.toFilter).get.acls.asScala.map(result => result.acl()).toSet) client.close() } @@ -119,10 +119,10 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with @Test def testAclOperations2(): Unit = { client = AdminClient.create(createConfig()) - val results = client.createAcls(List(ACL2, ACL2).asJava) - assertEquals(Set(ACL2, ACL2), results.results().keySet().asScala) + val results = client.createAcls(List(acl2, acl2).asJava) + assertEquals(Set(acl2, acl2), results.results().keySet().asScala) results.all().get() - waitForDescribeAcls(client, ACL2.toFilter, Set(ACL2)) + waitForDescribeAcls(client, acl2.toFilter, Set(acl2)) val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null), AccessControlEntryFilter.ANY) val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"), AccessControlEntryFilter.ANY) @@ -132,7 +132,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions()) assertEquals(Set(filterA, filterB), results2.results().keySet().asScala) assertEquals(Set(), results2.results.get(filterA).get.acls.asScala.map(result => result.acl()).toSet) - assertEquals(Set(ACL2), results2.results.get(filterB).get.acls.asScala.map(result => result.acl()).toSet) + assertEquals(Set(acl2), results2.results.get(filterB).get.acls.asScala.map(result => result.acl()).toSet) waitForDescribeAcls(client, filterB, Set()) From 969246b676867df6f76a8eff0bd40b99ddbabf72 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Wed, 7 Jun 2017 09:28:20 -0700 Subject: [PATCH 9/9] Update testAclAuthorizationDenied to take into account recent ACL inheritance changes --- .../kafka/api/SaslSslAdminClientIntegrationTest.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index d471593a80609..c298a7af2e6dc 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -237,14 +237,16 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with testAclGet(true) testAclCreateGetDelete(false) - // Test that we cannot do anything with ACLs when Describe is denied. + // Test that we cannot do anything with ACLs when Describe and Alter are denied. removeClusterAcl(Deny, Alter) addClusterAcl(Deny, Describe) + addClusterAcl(Deny, Alter) testAclGet(false) testAclCreateGetDelete(false) // Test that we can create, delete, and get ACLs with the default ACLs. removeClusterAcl(Deny, Describe) + removeClusterAcl(Deny, Alter) testAclGet(true) testAclCreateGetDelete(true)