Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14828: Remove R/W locks using persistent data structures #13437

Merged
merged 10 commits into from
Apr 21, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.jmh.acl;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAclWithId;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.kafka.common.acl.AclOperation.READ;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 0)
@Measurement(iterations = 4)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class StandardAuthorizerUpdateBenchmark {
@Param({"25000", "50000", "75000", "100000"})
private int aclCount;
private final String resourceNamePrefix = "foo-bar35_resource-";
private static final KafkaPrincipal PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private StandardAuthorizer authorizer;
private final Set<Uuid> ids = new HashSet<>();

private List<StandardAclWithId> aclsToAdd = prepareAcls();

int index = 0;
private static final Random RANDOM = new Random(System.currentTimeMillis());

@Setup(Level.Trial)
public void setup() throws Exception {
authorizer = new StandardAuthorizer();
addAcls(aclCount);
}

@TearDown(Level.Trial)
public void tearDown() throws IOException {
authorizer.close();
}

@Benchmark
public void testAddAcl() {
StandardAclWithId aclWithId = aclsToAdd.get(index++);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
}

private List<StandardAclWithId> prepareAcls() {
return IntStream.range(0, 10000)
.mapToObj(i -> {
ResourceType resourceType = RANDOM.nextInt(10) > 7 ? ResourceType.GROUP : ResourceType.TOPIC;
String resourceName = resourceNamePrefix + i;
ResourcePattern resourcePattern = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
return aclsForResource(resourcePattern);
})
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

private List<StandardAclWithId> aclsForResource(ResourcePattern pattern) {
return IntStream.range(1, 256)
.mapToObj(i -> {
String p = PRINCIPAL.toString() + RANDOM.nextInt(100);
String h = "127.0.0." + i;
return new StandardAcl(pattern.resourceType(), pattern.name(), pattern.patternType(), p, h, READ, ALLOW);
})
.map(this::withId)
.collect(Collectors.toList());
}

private StandardAclWithId withId(StandardAcl acl) {
Uuid id = new Uuid(acl.hashCode(), acl.hashCode());
while (ids.contains(id)) {
id = Uuid.randomUuid();
}
ids.add(id);
return new StandardAclWithId(id, acl);
}

private void addAcls(int num) {
IntStream.range(0, num)
.mapToObj(aclsToAdd::get)
.forEach(aclWithId -> {
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
index++;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata.authorizer;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.immutable.ImmutableNavigableSet;

import java.util.ArrayList;
import java.util.List;

/**
* An immutable class that stores the ACLs in KRaft-based clusters.
*/
public class AclCache {
/**
* Contains all of the current ACLs sorted by (resource type, resource name).
*/
private final ImmutableNavigableSet<StandardAcl> aclsByResource;

/**
* Contains all of the current ACLs indexed by UUID.
*/
private final ImmutableMap<Uuid, StandardAcl> aclsById;

AclCache() {
this(ImmutableNavigableSet.empty(), ImmutableMap.empty());
}

private AclCache(final ImmutableNavigableSet<StandardAcl> aclsByResource, final ImmutableMap<Uuid, StandardAcl> aclsById) {
this.aclsByResource = aclsByResource;
this.aclsById = aclsById;
}

public ImmutableNavigableSet<StandardAcl> aclsByResource() {
return aclsByResource;
}

Iterable<AclBinding> acls(AclBindingFilter filter) {
List<AclBinding> aclBindingList = new ArrayList<>();
aclsByResource.forEach(acl -> {
AclBinding aclBinding = acl.toBinding();
if (filter.matches(aclBinding)) {
aclBindingList.add(aclBinding);
}
});
return aclBindingList;
}

int count() {
return aclsById.size();
}

StandardAcl getAcl(Uuid id) {
return aclsById.get(id);
}

AclCache addAcl(Uuid id, StandardAcl acl) {
Copy link
Contributor

@Hangleton Hangleton Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since writes are done on a single thread, the only case of concurrency we have to solve here is when multiple reads and a single write are happening in parallel.

Do I get this right that the single writer assumption stated in the PR description is critical to achieve consistency in the sequence of operations below? (e.g. that the state checked line 77 is still valid line 81). Should multiple writes happen concurrently, this would not be the case, right? Is there a way to enforce the single writer condition? Or, shouldn't the cache preserve consistency under multiple writers (since it has no control over how many actors can update its state concurrently)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since writes are done on a single thread, the only case of concurrency we have to solve here is when multiple reads and a single write are happening in parallel.

Do I get this right that the single writer assumption stated in the PR description is critical to achieve consistency in the sequence of operations below? (e.g. that the state checked line 77 is still valid line 81). Should multiple writes happen concurrently, this would not be the case, right? Is there a way to enforce the single writer condition? Or, shouldn't the cache preserve consistency under multiple writers (since it has no control over how many actors can update its state concurrently)?

Thanks @Hangleton for the comment. This condition of single write will always be true for Authorizer because we have to apply the ACL changes in the order of their arrival. In case of Kraft, that order will be the order in which it is written to metadata topic. So we would never enable multiple threads to read from the metadata topic and write to AclCache. Due to this I didn't add a lock on writes here.

StandardAcl prevAcl = this.aclsById.get(id);
if (prevAcl != null) {
throw new RuntimeException("An ACL with ID " + id + " already exists.");
}

ImmutableMap<Uuid, StandardAcl> aclsById = this.aclsById.updated(id, acl);

if (this.aclsByResource.contains(acl)) {
throw new RuntimeException("Unable to add the ACL with ID " + id +
" to aclsByResource");
}

ImmutableNavigableSet<StandardAcl> aclsByResource = this.aclsByResource.added(acl);
return new AclCache(aclsByResource, aclsById);
}

AclCache removeAcl(Uuid id) {
StandardAcl acl = this.aclsById.get(id);
if (acl == null) {
throw new RuntimeException("ID " + id + " not found in aclsById.");
}
ImmutableMap<Uuid, StandardAcl> aclsById = this.aclsById.removed(id);

if (!this.aclsByResource.contains(acl)) {
throw new RuntimeException("Unable to remove the ACL with ID " + id +
" from aclsByResource");
}

ImmutableNavigableSet<StandardAcl> aclsByResource = this.aclsByResource.removed(acl);
return new AclCache(aclsByResource, aclsById);
}
}