Skip to content

Commit

Permalink
KAFKA-15318: Update the Authorizer via AclPublisher (apache#14169)
Browse files Browse the repository at this point in the history
On the controller, move publishing acls to the Authorizer into a dedicated MetadataPublisher,
AclPublisher. This publisher listens for notifications from MetadataLoader, and receives only
committed data. This brings the controller side in line with how the broker has always worked. It
also avoids some ugly code related to publishing directly from the QuorumController. Most important
of all, it clears the way to implement metadata transactions without worrying about Authorizer
state (since it will be handled by the MetadataLoader, along with other metadata image state).

In AclsDelta, we can remove isSnapshotDelta. We always know when the MetadataLoader is giving us a
snapshot. Also bring AclsDelta in line with the other delta classes, where completeSnapshot
calculates the diff between the previous image and the next one. We don't use this delta (since we
just apply the image directly to the authorizer) but we should have it, for consistency.

Finally, change MockAclMutator to avoid the need to subclass AclControlManager.

Reviewers: David Arthur <mumrah@gmail.com>
  • Loading branch information
cmccabe authored and jeqo committed Aug 15, 2023
1 parent 6b7bfc0 commit cb6c2d8
Show file tree
Hide file tree
Showing 15 changed files with 252 additions and 336 deletions.
18 changes: 7 additions & 11 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.server.metadata.{BrokerMetadataPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.NetworkClient
import org.apache.kafka.common.config.ConfigException
Expand All @@ -42,7 +42,6 @@ import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.util.SystemTimerReaper
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
Expand Down Expand Up @@ -419,7 +418,12 @@ class BrokerServer(
sharedServer.metadataPublishingFaultHandler,
"broker",
credentialProvider),
authorizer,
new AclPublisher(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
authorizer
),
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler)
metadataPublishers.add(brokerMetadataPublisher)
Expand Down Expand Up @@ -468,14 +472,6 @@ class BrokerServer(
rlm.startup()
})

// If we are using a ClusterMetadataAuthorizer which stores its ACLs in the metadata log,
// notify it that the loading process is complete.
authorizer match {
case Some(clusterMetadataAuthorizer: ClusterMetadataAuthorizer) =>
clusterMetadataAuthorizer.completeInitialLoad()
case _ => // nothing to do
}

// We're now ready to unfence the broker. This also allows this broker to transition
// from RECOVERY state to RUNNING state, once the controller unfences the broker.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
Expand Down
19 changes: 15 additions & 4 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher}
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.config.ConfigException
Expand Down Expand Up @@ -238,11 +238,14 @@ class ControllerServer(
setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler).
setZkMigrationEnabled(config.migrationEnabled)
}
controller = controllerBuilder.build()

// If we are using a ClusterMetadataAuthorizer, requests to add or remove ACLs must go
// through the controller.
authorizer match {
case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)
case _ => // nothing to do
case Some(a: ClusterMetadataAuthorizer) => a.setAclMutator(controller)
case _ =>
}
controller = controllerBuilder.build()

if (config.migrationEnabled) {
val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time, config, KafkaServer.zkClientConfigFromKafkaConfig(config))
Expand Down Expand Up @@ -361,6 +364,14 @@ class ControllerServer(
sharedServer.metadataPublishingFaultHandler
))

// Set up the ACL publisher.
metadataPublishers.add(new AclPublisher(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
authorizer
))

// Install all metadata publishers.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the controller metadata publishers to be installed",
Expand Down
102 changes: 102 additions & 0 deletions core/src/main/scala/kafka/server/metadata/AclPublisher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server.metadata

import kafka.utils.Logging
import org.apache.kafka.image.loader.{LoaderManifest, LoaderManifestType}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.fault.FaultHandler

import scala.concurrent.TimeoutException


class AclPublisher(
nodeId: Int,
faultHandler: FaultHandler,
nodeType: String,
authorizer: Option[Authorizer],
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "

override def name(): String = s"AclPublisher ${nodeType} id=${nodeId}"

var completedInitialLoad = false

override def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
manifest: LoaderManifest
): Unit = {
val deltaName = s"MetadataDelta up to ${newImage.offset()}"

// Apply changes to ACLs. This needs to be handled carefully because while we are
// applying these changes, the Authorizer is continuing to return authorization
// results in other threads. We never want to expose an invalid state. For example,
// if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo,
// we want to apply those changes in that order, not the reverse order! Otherwise
// there could be a window during which incorrect authorization results are returned.
Option(delta.aclsDelta()).foreach { aclsDelta =>
authorizer match {
case Some(authorizer: ClusterMetadataAuthorizer) => if (manifest.`type`().equals(LoaderManifestType.SNAPSHOT)) {
try {
// If the delta resulted from a snapshot load, we want to apply the new changes
// all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the
// first snapshot load, it will also complete the futures returned by
// Authorizer#start (which we wait for before processing RPCs).
info(s"Loading authorizer snapshot at offset ${newImage.offset()}")
authorizer.loadSnapshot(newImage.acls().acls())
} catch {
case t: Throwable => faultHandler.handleFault("Error loading " +
s"authorizer snapshot in $deltaName", t)
}
} else {
try {
// Because the changes map is a LinkedHashMap, the deltas will be returned in
// the order they were performed.
aclsDelta.changes().entrySet().forEach(e =>
if (e.getValue.isPresent) {
authorizer.addAcl(e.getKey, e.getValue.get())
} else {
authorizer.removeAcl(e.getKey)
})
} catch {
case t: Throwable => faultHandler.handleFault("Error loading " +
s"authorizer changes in $deltaName", t)
}
}
if (!completedInitialLoad) {
// If we are receiving this onMetadataUpdate call, that means the MetadataLoader has
// loaded up to the local high water mark. So we complete the initial load, enabling
// the authorizer.
completedInitialLoad = true
authorizer.completeInitialLoad()
}
case _ => // No ClusterMetadataAuthorizer is configured. There is nothing to do.
}
}
}

override def close(): Unit = {
authorizer match {
case Some(authorizer: ClusterMetadataAuthorizer) => authorizer.completeInitialLoad(new TimeoutException)
case _ =>
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.fault.FaultHandler

import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -107,7 +105,7 @@ class BrokerMetadataPublisher(
var dynamicConfigPublisher: DynamicConfigPublisher,
dynamicClientQuotaPublisher: DynamicClientQuotaPublisher,
scramPublisher: ScramPublisher,
private val _authorizer: Option[Authorizer],
aclPublisher: AclPublisher,
fatalFaultHandler: FaultHandler,
metadataPublishingFaultHandler: FaultHandler
) extends MetadataPublisher with Logging {
Expand Down Expand Up @@ -229,43 +227,8 @@ class BrokerMetadataPublisher(
// Apply SCRAM delta.
scramPublisher.onMetadataUpdate(delta, newImage)

// Apply changes to ACLs. This needs to be handled carefully because while we are
// applying these changes, the Authorizer is continuing to return authorization
// results in other threads. We never want to expose an invalid state. For example,
// if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo,
// we want to apply those changes in that order, not the reverse order! Otherwise
// there could be a window during which incorrect authorization results are returned.
Option(delta.aclsDelta()).foreach { aclsDelta =>
_authorizer match {
case Some(authorizer: ClusterMetadataAuthorizer) => if (aclsDelta.isSnapshotDelta) {
try {
// If the delta resulted from a snapshot load, we want to apply the new changes
// all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the
// first snapshot load, it will also complete the futures returned by
// Authorizer#start (which we wait for before processing RPCs).
authorizer.loadSnapshot(newImage.acls().acls())
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error loading " +
s"authorizer snapshot in $deltaName", t)
}
} else {
try {
// Because the changes map is a LinkedHashMap, the deltas will be returned in
// the order they were performed.
aclsDelta.changes().entrySet().forEach(e =>
if (e.getValue.isPresent) {
authorizer.addAcl(e.getKey, e.getValue.get())
} else {
authorizer.removeAcl(e.getKey)
})
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error loading " +
s"authorizer changes in $deltaName", t)
}
}
case _ => // No ClusterMetadataAuthorizer is configured. There is nothing to do.
}
}
// Apply ACL delta.
aclPublisher.onMetadataUpdate(delta, newImage, manifest)

try {
// Propagate the new image to the group coordinator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
import org.apache.kafka.controller.MockAclMutator
import org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.AuthorizerTestServerInfo
import org.apache.kafka.metadata.authorizer.{MockAclMutator, StandardAuthorizer}
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, IBP_2_0_IV1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class BrokerMetadataPublisherTest {
mock(classOf[DynamicConfigPublisher]),
mock(classOf[DynamicClientQuotaPublisher]),
mock(classOf[ScramPublisher]),
None,
mock(classOf[AclPublisher]),
faultHandler,
faultHandler
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAclWithId;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult;
Expand All @@ -48,7 +46,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -58,22 +55,12 @@
/**
* The AclControlManager manages any ACLs that are stored in the __cluster_metadata topic.
* If the ACLs are stored externally (such as in ZooKeeper) then there will be nothing for
* this manager to do, and the authorizer field will always be Optional.empty.
*
* Because the Authorizer is being concurrently used by other threads, we need to be
* careful about snapshots. We don't want the Authorizer to act based on partial state
* during the loading process. Therefore, unlike most of the other managers,
* AclControlManager needs to receive callbacks when we start loading a snapshot and when
* we finish. The prepareForSnapshotLoad callback clears the authorizer field, preventing
* any changes from affecting the authorizer until completeSnapshotLoad is called.
* Note that the Authorizer's start() method will block until the first snapshot load has
* completed, which is another reason the prepare / complete callbacks are needed.
* this manager to do.
*/
public class AclControlManager {
static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty();

Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
Expand All @@ -85,32 +72,24 @@ Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
return this;
}

Builder setClusterMetadataAuthorizer(Optional<ClusterMetadataAuthorizer> authorizer) {
this.authorizer = authorizer;
return this;
}

AclControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
return new AclControlManager(logContext, snapshotRegistry, authorizer);
return new AclControlManager(logContext, snapshotRegistry);
}
}

private final Logger log;
private final TimelineHashMap<Uuid, StandardAcl> idToAcl;
private final TimelineHashSet<StandardAcl> existingAcls;
private final Optional<ClusterMetadataAuthorizer> authorizer;

AclControlManager(
private AclControlManager(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
Optional<ClusterMetadataAuthorizer> authorizer
SnapshotRegistry snapshotRegistry
) {
this.log = logContext.logger(AclControlManager.class);
this.idToAcl = new TimelineHashMap<>(snapshotRegistry, 0);
this.existingAcls = new TimelineHashSet<>(snapshotRegistry, 0);
this.authorizer = authorizer;
}

ControllerResult<List<AclCreateResult>> createAcls(List<AclBinding> acls) {
Expand Down Expand Up @@ -227,26 +206,15 @@ static void validateFilter(AclBindingFilter filter) {
}
}

public void replay(
AccessControlEntryRecord record,
Optional<OffsetAndEpoch> snapshotId
) {
public void replay(AccessControlEntryRecord record) {
StandardAclWithId aclWithId = StandardAclWithId.fromRecord(record);
idToAcl.put(aclWithId.id(), aclWithId.acl());
existingAcls.add(aclWithId.acl());
if (!snapshotId.isPresent()) {
authorizer.ifPresent(a -> {
a.addAcl(aclWithId.id(), aclWithId.acl());
});
}
log.info("Replayed AccessControlEntryRecord for {}, setting {}", record.id(),
aclWithId.acl());
}

public void replay(
RemoveAccessControlEntryRecord record,
Optional<OffsetAndEpoch> snapshotId
) {
public void replay(RemoveAccessControlEntryRecord record) {
StandardAcl acl = idToAcl.remove(record.id());
if (acl == null) {
throw new RuntimeException("Unable to replay " + record + ": no acl with " +
Expand All @@ -256,11 +224,6 @@ public void replay(
throw new RuntimeException("Unable to replay " + record + " for " + acl +
": acl not found " + "in existingAcls.");
}
if (!snapshotId.isPresent()) {
authorizer.ifPresent(a -> {
a.removeAcl(record.id());
});
}
log.info("Replayed RemoveAccessControlEntryRecord for {}, removing {}", record.id(), acl);
}

Expand Down

0 comments on commit cb6c2d8

Please sign in to comment.