Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13646: Implement KIP-801: KRaft authorizer #11649

Merged
merged 6 commits into from Feb 9, 2022

Conversation

cmccabe
Copy link
Contributor

@cmccabe cmccabe commented Jan 5, 2022

Currently, when using KRaft mode, users still have to have an Apache ZooKeeper instance if they want to use AclAuthorizer. We should have a built-in Authorizer for KRaft mode that does not depend on ZooKeeper. This PR introduces such an authorizer, called StandardAuthorizer. See KIP-801 for a full description of the new Authorizer design.

Authorizer.java: add aclCount API as described in KIP-801. StandardAuthorizer is currently the only authorizer that implements it, but eventually we may implement it for AclAuthorizer and others as well.

ControllerApis.scala: fix a bug where createPartitions was authorized using CREATE on the topic resource rather than ALTER on the topic resource as it should have been.

QuorumTestHarness: rename the controller endpoint to CONTROLLER for consistency (the brokers already called it that). This is relevant in AuthorizerIntegrationTest where we are examining endpoint names. Also add the controllerServers call.

TestUtils.scala: adapt the ACL functions to be usable from KRaft, by ensuring that they use the Authorizer from the current active controller.

BrokerMetadataPublisher.scala: add broker-side ACL application logic.

Controller.java: add ACL APIs. Also add a findAllTopicIds API in order to make junit tests that use KafkaServerTestHarness#getTopicNames and KafkaServerTestHarness#getTopicIds work smoothly.

AuthorizerIntegrationTest.scala: convert over testAuthorizationWithTopicExisting (more to come soon)

QuorumController.java: add logic for replaying ACL-based records. This means storing them in the new AclControlManager object, and integrating them into controller snapshots. It also means applying the changes in the Authorizer, if one is configured. In renounce, when reverting to a snapshot, also set newBytesSinceLastSnapshot to 0.

There are a few follow-up items not in this PR. One is actually creating the new acl count metric. Another is implementing the "start after we reach the high water mark" authorizer logic. Also, this doesn't implement the new busy error code. Finally, we need to implement logIfDenied.

@cmccabe cmccabe changed the title KRaft authorizer KRaft authorizer [WIP] Jan 6, 2022
@cmccabe cmccabe changed the title KRaft authorizer [WIP] KAFKA-13646: Implement KIP-801: KRaft authorizer Feb 4, 2022
@cmccabe cmccabe added the kraft label Feb 4, 2022
@@ -689,7 +689,7 @@ class ControllerApis(val requestChannel: RequestChannel,
def handleCreatePartitions(request: RequestChannel.Request): Unit = {
val future = createPartitions(request.body[CreatePartitionsRequest].data,
authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n))
names => authHelper.filterByAuthorized(request.context, ALTER, TOPIC, names)(n => n))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to suggest pulling this fix out so that we could backport more easily, but I guess it doesn't matter since kraft never had an authorizer before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should pull out the CREATE -> ALTER fix since in theory you could be using KRaft + AclAuthorizer. If you want I can do that now (but it won't have a unit test, hope that's OK).

(It is tested by AuthorizerIntegrationTest but I wouldn't want to backport that to 3.{0,1})

getController().kafkaController.controllerContext.topicNames.toMap
if (isKRaftTest()) {
val result = new util.HashMap[Uuid, String]()
controllerServer.controller.findAllTopicIds(Long.MaxValue).get().entrySet().forEach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder if we may as well add a findAllTopicNames (or something like that) to go along with findAllTopicIds.

Copy link
Contributor Author

@cmccabe cmccabe Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't see why that would need to be in the controller API.

In general I wish this test harness function didn't exist (maybe we should mark it as @deprecated).

@@ -90,6 +91,16 @@
CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(long deadlineNs,
Collection<String> topicNames);

/**
* Find the ids for all topic names. Note that this function should only used for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: only be used

properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString())
} else {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not necessary to identify the broker as a super-user for AclAuthorizer?

Copy link
Contributor Author

@cmccabe cmccabe Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a chicken / egg problem with the broker connecting to the controller quorum in the absence of ACLs. Basically the bootstrapping problem.

Making the broker principal a super user doesn't really change the test logic since the test in ZK mode just gives the broker all the permissions it needs in the setUp function, then moves on (it doesn't test cases where the broker lacks permissions)

static Set<String> getConfiguredSuperUsers(Map<String, ?> configs) {
Object configValue = configs.get(SUPER_USERS_CONFIG);
if (configValue == null) return Collections.emptySet();
String stringValue = configValue.toString().trim();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in AclAuthorizer is a little different. We do the split first and then trim:

    superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
      case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
    }.getOrElse(Set.empty[KafkaPrincipal])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we can do it that way here, too. I suppose we should also call SecurityUtils.parseKafkaPrincipal just to filter out bad values.

/**
* Contains all of the current ACLs sorted by (resource type, pattern type, resource name).
*/
private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a follow-up, but we will probably need to add something like AclAuthorizerBenchmark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StandardAcl acl = iterator.next();
if (!aclIsInCorrectSection(action, acl)) break;
AuthorizationResult result = findResult(action, requestContext, acl);
if (ALLOWED.equals(result)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ALLOWED == result

Add logIfDenied = true to a few places where we are doing an initial check against a CLUSTER resource.

Convert over the other tests in AuthorizerIntegrationTest. Add a special case for the result of deleteTopics since in KRaft mode we honor DESCRIBE permissions there, and in ZK mode we do not. In testPatternSubscriptionMatchingInternalTopic, retry subscribing. In testCreatePermissionNeededToReadFromNonExistentTopic, don't depend on ZK to check topic existence. In a few spots, check that an ACL gets propagated to all authorizers rather than just waiting for it to make it to a single authorizer.

TestUtils: extend timeout of waitAndVerifyAcls from 15 seconds to 45 just to avoid rare flakes. Rename some variables to avoid shadowing.

StandardAuthorizerData: add trace logging for addAcl / removeAcl. Re-arrange StandardAcl sort order so that we always step forward in the map. Fix a bug in the ACL scanning that could have led to ACLs being ignored.

StandardAuthorizerTest: add two better authorization tests.
@cmccabe
Copy link
Contributor Author

cmccabe commented Feb 8, 2022

Failed tests were:

ConnectionQuotasTest#testNonDefaultConnectionCountLimitAndRateLimit
DynamicBrokerReconfigurationTest#testThreadPoolResize

These tests are not related to this change (they don't deal with the new authorizer) and running them locally, they pass.

AuthorizationResultBuilder builder) {
NavigableSet<StandardAcl> headSet = aclsByResource.tailSet(exemplar, true);
String resourceName = action.resourcePattern().name();
for (Iterator<StandardAcl> iterator = headSet.iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think we need the iterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really prefer to be explicit here, since we are iterating over a set that is potentially changing. I know that Java will only evaluate the y in for (x: y) {} once but I think there are some languages that don't, so no point being obscure.

StandardAcl exemplar,
AuthorizableRequestContext requestContext,
AuthorizationResultBuilder builder) {
NavigableSet<StandardAcl> headSet = aclsByResource.tailSet(exemplar, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this to tailSet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. hehe. fixed.

private final AuthorizationResult defaultResult;

/**
* Contains all of the current ACLs sorted by (resource type, pattern type, resource name).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: need to update this since resource name is before pattern type now

AclPermissionType.DENY));
}

private static final int signum(int input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final is redundant for private static method

StandardAcl exemplar = new StandardAcl(
action.resourcePattern().resourceType(),
action.resourcePattern().name(),
PatternType.UNKNOWN,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth mentioning somewhere in here that UNKNOWN pattern type is ordered before others since the initial search depends on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will add a comment.

List<ApiMessageAndVersion> records = new ArrayList<>(acls.size());
for (AclBinding acl : acls) {
try {
validateNewAcl(acl);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These validations seem to make sense. Currently they are all implemented by AclAuthorizer, so I wanted to ask whether it makes sense to pull them up. Basically are there any cases where a custom authorizer might be depending on any of the cases this validation is excluding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to pull them up eventually in a follow-up PR.

@cmccabe
Copy link
Contributor Author

cmccabe commented Feb 9, 2022

Test failures are not related

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. There are a few minor improvements I noticed while reviewing again, but I will submit a separate patch for them.

@hachikuji hachikuji merged commit d35283f into apache:trunk Feb 9, 2022
hachikuji added a commit that referenced this pull request Feb 11, 2022
…#11745)

In #11649, we fixed one permission inconsistency between kraft and zk authorization for the `CreatePartitions` request. Previously kraft was requiring `CREATE` permission on the `Topic` resource when it should have required `ALTER`. A second inconsistency is that kraft was also allowing `CREATE` on the `Cluster` resource, which is not supported in zk clusters and was not documented in KIP-195: https://cwiki.apache.org/confluence/display/KAFKA/KIP-195%3A+AdminClient.createPartitions. This patch fixes this inconsistency and adds additional test coverage for both cases.

Reviewers: José Armando García Sancio <jsancio@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants