diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0a7607acb755..4a6082e310b4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -352,7 +352,7 @@ class KafkaApis(val requestChannel: RequestChannel, } // reject the request if not authorized to the group - if (!authorize(request, READ, GROUP, offsetCommitRequest.data.groupId)) { + if (!authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { val error = Errors.GROUP_AUTHORIZATION_FAILED val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( offsetCommitRequest.data.topics, @@ -378,7 +378,8 @@ class KafkaApis(val requestChannel: RequestChannel, } else { val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition] - val authorizedTopics = filterAuthorized(request, READ, TOPIC, offsetCommitRequest.data.topics.asScala.map(_.name)) + val authorizedTopics = filterAuthorized(request.context, READ, TOPIC, + offsetCommitRequest.data.topics.asScala.map(_.name)) for (topicData <- offsetCommitRequest.data.topics.asScala) { for (partitionData <- topicData.partitions.asScala) { val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) @@ -472,14 +473,14 @@ class KafkaApis(val requestChannel: RequestChannel, if (produceRequest.hasTransactionalRecords) { val isAuthorizedTransactional = produceRequest.transactionalId != null && - authorize(request, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId) + authorize(request.context, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId) if (!isAuthorizedTransactional) { sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) return } // Note that authorization to a transactionalId implies ProducerId authorization - } else if (produceRequest.hasIdempotentRecords && !authorize(request, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { + } else if (produceRequest.hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return } @@ -488,7 +489,7 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() - val authorizedTopics = filterAuthorized(request, WRITE, TOPIC, + val authorizedTopics = filterAuthorized(request.context, WRITE, TOPIC, produceRequest.partitionRecordsOrFail.asScala.toSeq.map(_._1.topic)) for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) { @@ -625,7 +626,7 @@ class KafkaApis(val requestChannel: RequestChannel, val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. - if (authorize(request, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { + if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { fetchContext.foreachPartition { (topicPartition, data) => if (!metadataCache.contains(topicPartition)) erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) @@ -641,7 +642,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Regular Kafka consumers need READ permission on each partition they are fetching. val fetchTopics = new mutable.ArrayBuffer[String] fetchContext.foreachPartition { (topicPartition, _) => fetchTopics += topicPartition.topic } - val authorizedTopics = filterAuthorized(request, READ, TOPIC, fetchTopics) + val authorizedTopics = filterAuthorized(request.context, READ, TOPIC, fetchTopics) fetchContext.foreachPartition { (topicPartition, data) => if (!authorizedTopics.contains(topicPartition.topic)) erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED) @@ -887,7 +888,8 @@ class KafkaApis(val requestChannel: RequestChannel, val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic)) + val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC, + offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic)) val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition { case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic) } @@ -927,7 +929,8 @@ class KafkaApis(val requestChannel: RequestChannel, val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic)) + val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC, + offsetRequest.partitionTimestamps.asScala.toSeq.map(_._1.topic)) val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition { case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic) } @@ -1111,15 +1114,17 @@ class KafkaApis(val requestChannel: RequestChannel, else metadataRequest.topics.asScala.toSet - val authorizedForDescribeTopics = filterAuthorized(request, DESCRIBE, TOPIC, topics.toSeq, logIfDenied = !metadataRequest.isAllTopics) + val authorizedForDescribeTopics = filterAuthorized(request.context, DESCRIBE, TOPIC, + topics.toSeq, logIfDenied = !metadataRequest.isAllTopics) var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(authorizedForDescribeTopics.contains) var unauthorizedForCreateTopics = Set[String]() if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { - if (!authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) { - val authorizedForCreateTopics = filterAuthorized(request, CREATE, TOPIC, nonExistingTopics.toSeq) + if (!authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) { + val authorizedForCreateTopics = filterAuthorized(request.context, CREATE, TOPIC, + nonExistingTopics.toSeq) unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics) authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics) } @@ -1156,7 +1161,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (request.header.apiVersion >= 8) { // get cluster authorized operations if (metadataRequest.data.includeClusterAuthorizedOperations) { - if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME)) + if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) clusterAuthorizedOperations = authorizedOperations(request, Resource.CLUSTER) else clusterAuthorizedOperations = 0 @@ -1196,14 +1201,14 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetFetchRequest = request.body[OffsetFetchRequest] def partitionAuthorized[T](elements: List[T], topic: T => String): (Seq[T], Seq[T]) = { - val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, elements.map(topic)) + val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC, elements.map(topic)) elements.partition(element => authorizedTopics.contains(topic.apply(element))) } def createResponse(requestThrottleMs: Int): AbstractResponse = { val offsetFetchResponse = // reject the request if not authorized to the group - if (!authorize(request, DESCRIBE, GROUP, offsetFetchRequest.groupId)) + if (!authorize(request.context, DESCRIBE, GROUP, offsetFetchRequest.groupId)) offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED) else { if (header.apiVersion == 0) { @@ -1270,10 +1275,10 @@ class KafkaApis(val requestChannel: RequestChannel, val findCoordinatorRequest = request.body[FindCoordinatorRequest] if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id && - !authorize(request, DESCRIBE, GROUP, findCoordinatorRequest.data.key)) + !authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key)) sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id && - !authorize(request, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) + !authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { // get metadata (and create the topic if necessary) @@ -1342,7 +1347,7 @@ class KafkaApis(val requestChannel: RequestChannel, val describeGroupsResponseData = new DescribeGroupsResponseData() describeRequest.data.groups.asScala.foreach { groupId => - if (!authorize(request, DESCRIBE, GROUP, groupId)) { + if (!authorize(request.context, DESCRIBE, GROUP, groupId)) { describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, Errors.GROUP_AUTHORIZATION_FAILED)) } else { val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) @@ -1379,7 +1384,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListGroupsRequest(request: RequestChannel.Request): Unit = { val (error, groups) = groupCoordinator.handleListGroups() - if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME)) + if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) // With describe cluster access all groups are returned. We keep this alternative for backward compatibility. sendResponseMaybeThrottle(request, requestThrottleMs => new ListGroupsResponse(new ListGroupsResponseData() @@ -1391,7 +1396,7 @@ class KafkaApis(val requestChannel: RequestChannel, .setThrottleTimeMs(requestThrottleMs) )) else { - val filteredGroups = groups.filter(group => authorize(request, DESCRIBE, GROUP, group.groupId)) + val filteredGroups = groups.filter(group => authorize(request.context, DESCRIBE, GROUP, group.groupId)) sendResponseMaybeThrottle(request, requestThrottleMs => new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(error.code) @@ -1439,7 +1444,7 @@ class KafkaApis(val requestChannel: RequestChannel, // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION)) - } else if (!authorize(request, READ, GROUP, joinGroupRequest.data.groupId)) { + } else if (!authorize(request.context, READ, GROUP, joinGroupRequest.data.groupId)) { sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED)) } else { val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId) @@ -1490,7 +1495,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (!syncGroupRequest.areMandatoryProtocolTypeAndNamePresent()) { // Starting from version 5, ProtocolType and ProtocolName fields are mandatory. sendResponseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL)) - } else if (!authorize(request, READ, GROUP, syncGroupRequest.data.groupId)) { + } else if (!authorize(request.context, READ, GROUP, syncGroupRequest.data.groupId)) { sendResponseCallback(SyncGroupResult(Errors.GROUP_AUTHORIZATION_FAILED)) } else { val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]] @@ -1516,7 +1521,7 @@ class KafkaApis(val requestChannel: RequestChannel, val groups = deleteGroupsRequest.data.groupsNames.asScala.toSet val (authorizedGroups, unauthorizedGroups) = groups.partition { group => - authorize(request, DELETE, GROUP, group) + authorize(request.context, DELETE, GROUP, group) } val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++ @@ -1560,7 +1565,7 @@ class KafkaApis(val requestChannel: RequestChannel, // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. sendResponseCallback(Errors.UNSUPPORTED_VERSION) - } else if (!authorize(request, READ, GROUP, heartbeatRequest.data.groupId)) { + } else if (!authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) { sendResponseMaybeThrottle(request, requestThrottleMs => new HeartbeatResponse( new HeartbeatResponseData() @@ -1582,7 +1587,7 @@ class KafkaApis(val requestChannel: RequestChannel, val members = leaveGroupRequest.members.asScala.toList - if (!authorize(request, READ, GROUP, leaveGroupRequest.data.groupId)) { + if (!authorize(request.context, READ, GROUP, leaveGroupRequest.data.groupId)) { sendResponseMaybeThrottle(request, requestThrottleMs => { new LeaveGroupResponse(new LeaveGroupResponseData() .setThrottleTimeMs(requestThrottleMs) @@ -1673,11 +1678,14 @@ class KafkaApis(val requestChannel: RequestChannel, createTopicsRequest.data.topics.asScala.foreach { topic => results.add(new CreatableTopicResult().setName(topic.name)) } - val hasClusterAuthorization = authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false) + val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, + logIfDenied = false) val topics = createTopicsRequest.data.topics.asScala.map(_.name) - val authorizedTopics = if (hasClusterAuthorization) topics.toSet else filterAuthorized(request, CREATE, TOPIC, topics.toSeq) - val authorizedForDescribeConfigs = filterAuthorized(request, DESCRIBE_CONFIGS, TOPIC, topics.toSeq, logIfDenied = false) - .map(name => name -> results.find(name)).toMap + val authorizedTopics = + if (hasClusterAuthorization) topics.toSet + else filterAuthorized(request.context, CREATE, TOPIC, topics.toSeq) + val authorizedForDescribeConfigs = filterAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, + topics.toSeq, logIfDenied = false).map(name => name -> results.find(name)).toMap results.asScala.foreach(topic => { if (results.findAll(topic.name).size > 1) { @@ -1753,7 +1761,7 @@ class KafkaApis(val requestChannel: RequestChannel, .filter { _._2.size > 1 } .keySet val notDuped = topics.filterNot(topic => dupes.contains(topic.name)) - val authorizedTopics = filterAuthorized(request, ALTER, TOPIC, notDuped.map(_.name)) + val authorizedTopics = filterAuthorized(request.context, ALTER, TOPIC, notDuped.map(_.name)) val (authorized, unauthorized) = notDuped.partition { topic => authorizedTopics.contains(topic.name) } val (queuedForDeletion, valid) = authorized.partition { topic => @@ -1807,7 +1815,8 @@ class KafkaApis(val requestChannel: RequestChannel, results.add(new DeletableTopicResult() .setName(topic)) } - val authorizedTopics = filterAuthorized(request, DELETE, TOPIC, results.asScala.toSeq.map(_.name)) + val authorizedTopics = filterAuthorized(request.context, DELETE, TOPIC, + results.asScala.toSeq.map(_.name)) results.asScala.foreach(topic => { if (!authorizedTopics.contains(topic.name)) topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) @@ -1845,7 +1854,7 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopicResponses = mutable.Map[TopicPartition, DeleteRecordsPartitionResult]() val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]() - val authorizedTopics = filterAuthorized(request, DELETE, TOPIC, + val authorizedTopics = filterAuthorized(request.context, DELETE, TOPIC, deleteRecordsRequest.data.topics.asScala.map(_.name)) val deleteTopicPartitions = deleteRecordsRequest.data.topics.asScala.flatMap(deleteTopic => { deleteTopic.partitions.asScala.map(deletePartition => { @@ -1910,11 +1919,11 @@ class KafkaApis(val requestChannel: RequestChannel, val transactionalId = initProducerIdRequest.data.transactionalId if (transactionalId != null) { - if (!authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId)) { + if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) { sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) return } - } else if (!authorize(request, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { + } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return } @@ -1951,7 +1960,7 @@ class KafkaApis(val requestChannel: RequestChannel, val endTxnRequest = request.body[EndTxnRequest] val transactionalId = endTxnRequest.data.transactionalId - if (authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId)) { + if (authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) { def sendResponseCallback(error: Errors): Unit = { def createResponse(requestThrottleMs: Int): AbstractResponse = { val responseBody = new EndTxnResponse(new EndTxnResponseData() @@ -2093,7 +2102,7 @@ class KafkaApis(val requestChannel: RequestChannel, val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] val transactionalId = addPartitionsToTxnRequest.transactionalId val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala - if (!authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId)) + if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) sendResponseMaybeThrottle(request, requestThrottleMs => addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) else { @@ -2101,7 +2110,7 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() val authorizedPartitions = mutable.Set[TopicPartition]() - val authorizedTopics = filterAuthorized(request, WRITE, TOPIC, + val authorizedTopics = filterAuthorized(request.context, WRITE, TOPIC, partitionsToAdd.map(_.topic).filterNot(org.apache.kafka.common.internals.Topic.isInternal)) for (topicPartition <- partitionsToAdd) { if (!authorizedTopics.contains(topicPartition.topic)) @@ -2148,10 +2157,10 @@ class KafkaApis(val requestChannel: RequestChannel, val groupId = addOffsetsToTxnRequest.consumerGroupId val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) - if (!authorize(request, WRITE, TRANSACTIONAL_ID, transactionalId)) + if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) sendResponseMaybeThrottle(request, requestThrottleMs => new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) - else if (!authorize(request, READ, GROUP, groupId)) + else if (!authorize(request.context, READ, GROUP, groupId)) sendResponseMaybeThrottle(request, requestThrottleMs => new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)) else { @@ -2180,15 +2189,15 @@ class KafkaApis(val requestChannel: RequestChannel, // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization // since it is implied by transactionalId authorization - if (!authorize(request, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId)) + if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId)) sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) - else if (!authorize(request, READ, GROUP, txnOffsetCommitRequest.data.groupId)) + else if (!authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) else { val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]() - val authorizedTopics = filterAuthorized(request, READ, TOPIC, txnOffsetCommitRequest.offsets.keySet.asScala.toSeq.map(_.topic)) + val authorizedTopics = filterAuthorized(request.context, READ, TOPIC, txnOffsetCommitRequest.offsets.keySet.asScala.toSeq.map(_.topic)) for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) { if (!authorizedTopics.contains(topicPartition.topic)) @@ -2358,10 +2367,10 @@ class KafkaApis(val requestChannel: RequestChannel, // The OffsetsForLeaderEpoch API was initially only used for inter-broker communication and required // cluster permission. With KIP-320, the consumer now also uses this API to check for log truncation // following a leader change, so we also allow topic describe permission. - val (authorizedPartitions, unauthorizedPartitions) = if (authorize(request, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false)) { + val (authorizedPartitions, unauthorizedPartitions) = if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, logIfDenied = false)) { (requestInfo, Map.empty[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]) } else { - val authorizedTopics = filterAuthorized(request, DESCRIBE, TOPIC, requestInfo.keySet.toSeq.map(_.topic)) + val authorizedTopics = filterAuthorized(request.context, DESCRIBE, TOPIC, requestInfo.keySet.toSeq.map(_.topic)) requestInfo.partition { case (tp, _) => authorizedTopics.contains(tp.topic) } @@ -2385,9 +2394,9 @@ class KafkaApis(val requestChannel: RequestChannel, case ConfigResource.Type.BROKER_LOGGER => throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}") case ConfigResource.Type.BROKER => - authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) + authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) case ConfigResource.Type.TOPIC => - authorize(request, ALTER_CONFIGS, TOPIC, resource.name) + authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") } } @@ -2505,9 +2514,9 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) => resource.`type` match { case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => - authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) + authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) case ConfigResource.Type.TOPIC => - authorize(request, ALTER_CONFIGS, TOPIC, resource.name) + authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") } } @@ -2526,9 +2535,9 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource => resource.`type` match { case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => - authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME) + authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME) case ConfigResource.Type.TOPIC => - authorize(request, DESCRIBE_CONFIGS, TOPIC, resource.name) + authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.name) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}") } } @@ -2547,7 +2556,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit = { val alterReplicaDirsRequest = request.body[AlterReplicaLogDirsRequest] val responseMap = { - if (authorize(request, ALTER, CLUSTER, CLUSTER_NAME)) + if (authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) replicaManager.alterReplicaLogDirs(alterReplicaDirsRequest.partitionDirs.asScala) else alterReplicaDirsRequest.partitionDirs.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap @@ -2558,7 +2567,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit = { val describeLogDirsDirRequest = request.body[DescribeLogDirsRequest] val logDirInfos = { - if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME)) { + if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) { val partitions = if (describeLogDirsDirRequest.isAllTopicPartitions) replicaManager.logManager.allLogs.map(_.topicPartition).toSet @@ -2691,7 +2700,7 @@ class KafkaApis(val requestChannel: RequestChannel, None else Some(describeTokenRequest.data.owners.asScala.map(p => new KafkaPrincipal(p.principalType(), p.principalName)).toList) - def authorizeToken(tokenId: String) = authorize(request, DESCRIBE, DELEGATION_TOKEN, tokenId) + def authorizeToken(tokenId: String) = authorize(request.context, DESCRIBE, DELEGATION_TOKEN, tokenId) def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken) val tokens = tokenManager.getTokens(eligible) sendResponseCallback(Errors.NONE, tokens) @@ -2758,7 +2767,7 @@ class KafkaApis(val requestChannel: RequestChannel, }) } - if (!authorize(request, ALTER, CLUSTER, CLUSTER_NAME)) { + if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) { val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null) val partitionErrors: Map[TopicPartition, ApiError] = electionRequest.topicPartitions.iterator.map(partition => partition -> error).toMap @@ -2785,8 +2794,8 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetDeleteRequest = request.body[OffsetDeleteRequest] val groupId = offsetDeleteRequest.data.groupId - if (authorize(request, DELETE, GROUP, groupId)) { - val authorizedTopics = filterAuthorized(request, READ, TOPIC, + if (authorize(request.context, DELETE, GROUP, groupId)) { + val authorizedTopics = filterAuthorized(request.context, READ, TOPIC, offsetDeleteRequest.data.topics.asScala.map(_.name).toSeq) val topicPartitionErrors = mutable.Map[TopicPartition, Errors]() @@ -2842,7 +2851,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDescribeClientQuotasRequest(request: RequestChannel.Request): Unit = { val describeClientQuotasRequest = request.body[DescribeClientQuotasRequest] - if (authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { + if (authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter).map { case (quotaEntity, quotaConfigs) => quotaEntity -> quotaConfigs.map { case (key, value) => key -> Double.box(value) }.asJava }.asJava @@ -2857,7 +2866,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = { val alterClientQuotasRequest = request.body[AlterClientQuotasRequest] - if (authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) { + if (authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) { val result = adminManager.alterClientQuotas(alterClientQuotasRequest.entries().asScala.toSeq, alterClientQuotasRequest.validateOnly()).asJava sendResponseMaybeThrottle(request, requestThrottleMs => @@ -2868,45 +2877,48 @@ class KafkaApis(val requestChannel: RequestChannel, } } - private def authorize(request: RequestChannel.Request, - operation: AclOperation, - resourceType: ResourceType, - resourceName: String, - logIfAllowed: Boolean = true, - logIfDenied: Boolean = true, - refCount: Int = 1): Boolean = { + // private package for testing + private[server] def authorize(requestContext: RequestContext, + operation: AclOperation, + resourceType: ResourceType, + resourceName: String, + logIfAllowed: Boolean = true, + logIfDenied: Boolean = true, + refCount: Int = 1): Boolean = { authorizer.forall { authZ => val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied)) - authZ.authorize(request.context, actions).asScala.head == AuthorizationResult.ALLOWED + authZ.authorize(requestContext, actions).asScala.head == AuthorizationResult.ALLOWED } } - private def filterAuthorized(request: RequestChannel.Request, - operation: AclOperation, - resourceType: ResourceType, - resourceNames: Seq[String], - logIfAllowed: Boolean = true, - logIfDenied: Boolean = true): Set[String] = { + // private package for testing + private[server] def filterAuthorized(requestContext: RequestContext, + operation: AclOperation, + resourceType: ResourceType, + resourceNames: Seq[String], + logIfAllowed: Boolean = true, + logIfDenied: Boolean = true): Set[String] = { + val uniqueResourceNames = resourceNames.distinct authorizer match { case Some(authZ) => val groupedResourceNames = resourceNames.groupBy(identity) - val actions = resourceNames.map { resourceName => + val actions = uniqueResourceNames.map { resourceName => val count = groupedResourceNames(resourceName).size val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) new Action(operation, resource, count, logIfAllowed, logIfDenied) } - authZ.authorize(request.context, actions.asJava).asScala - .zip(resourceNames) + authZ.authorize(requestContext, actions.asJava).asScala + .zip(uniqueResourceNames) .filter { case (authzResult, _) => authzResult == AuthorizationResult.ALLOWED } .map { case (_, resourceName) => resourceName }.toSet case None => - resourceNames.toSet + uniqueResourceNames.toSet } } private def authorizeClusterOperation(request: RequestChannel.Request, operation: AclOperation): Unit = { - if (!authorize(request, operation, CLUSTER, CLUSTER_NAME)) + if (!authorize(request.context, operation, CLUSTER, CLUSTER_NAME)) throw new ClusterAuthorizationException(s"Request $request is not authorized.") } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 1b6c32856d57..a4e914573daf 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -40,6 +40,7 @@ import kafka.network.RequestChannel.SendResponse import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.{MockTime, TestUtils} import kafka.zk.KafkaZkClient +import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.internals.Topic @@ -59,7 +60,12 @@ import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _} +import org.apache.kafka.common.resource.PatternType +import org.apache.kafka.common.resource.ResourcePattern +import org.apache.kafka.common.resource.ResourceType import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.server.authorizer.Action +import org.apache.kafka.server.authorizer.AuthorizationResult import org.apache.kafka.server.authorizer.Authorizer import org.easymock.EasyMock._ import org.easymock.{Capture, EasyMock, IAnswer} @@ -82,7 +88,6 @@ class KafkaApisTest { private val metrics = new Metrics() private val brokerId = 1 private val metadataCache = new MetadataCache(brokerId) - private val authorizer: Option[Authorizer] = None private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager]) private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager]) private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager]) @@ -101,7 +106,8 @@ class KafkaApisTest { metrics.close() } - def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): KafkaApis = { + def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion, + authorizer: Option[Authorizer] = None): KafkaApis = { val properties = TestUtils.createBrokerConfig(brokerId, "zk") properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString) properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString) @@ -126,6 +132,93 @@ class KafkaApisTest { ) } + @Test + def testAuthorize(): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + + val operation = AclOperation.WRITE + val resourceType = ResourceType.TOPIC + val resourceName = "topic-1" + val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion, + clientId, 0) + val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost, + KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY) + + val expectedActions = Seq( + new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL), + 1, true, true) + ) + + EasyMock.expect(authorizer.authorize( + requestContext, expectedActions.asJava + )).andReturn( + Seq(AuthorizationResult.ALLOWED).asJava + ).once() + + EasyMock.replay(authorizer) + + val result = createKafkaApis(authorizer = Some(authorizer)).authorize( + requestContext, + operation, + resourceType, + resourceName, + ) + + verify(authorizer) + + assertEquals(true, result) + } + + @Test + def testFilterAuthorized(): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + + val operation = AclOperation.WRITE + val resourceType = ResourceType.TOPIC + val resourceName1 = "topic-1" + val resourceName2 = "topic-2" + val resourceName3 = "topic-3" + val requestHeader = new RequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion, + clientId, 0) + val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost, + KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY) + + val expectedActions = Seq( + new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL), + 2, true, true), + new Action(operation, new ResourcePattern(resourceType, resourceName2, PatternType.LITERAL), + 1, true, true), + new Action(operation, new ResourcePattern(resourceType, resourceName3, PatternType.LITERAL), + 1, true, true), + ) + + EasyMock.expect(authorizer.authorize( + requestContext, expectedActions.asJava + )).andReturn( + Seq( + AuthorizationResult.ALLOWED, + AuthorizationResult.DENIED, + AuthorizationResult.ALLOWED + ).asJava + ).once() + + EasyMock.replay(authorizer) + + val result = createKafkaApis(authorizer = Some(authorizer)).filterAuthorized( + requestContext, + operation, + resourceType, + // Duplicate resource names should not trigger multiple calls to authorize + Seq(resourceName1, resourceName2, resourceName1, resourceName3) + ) + + verify(authorizer) + + assertEquals(Set(resourceName1, resourceName3), result) + } + @Test def testOffsetCommitWithInvalidPartition(): Unit = { val topic = "topic"