Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ License Version 2.0:
- caffeine-3.2.0
- commons-beanutils-1.11.0
- commons-collections-3.2.2
- commons-collections4-4.5.0
- commons-digester-2.1
- commons-lang3-3.18.0
- commons-logging-1.3.5
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1856,6 +1856,7 @@ project(':clients') {
implementation libs.opentelemetryProto
implementation libs.protobuf
implementation libs.slf4jApi
implementation libs.commonsCollection

// libraries which should be added as runtime dependencies in generated pom.xml should be defined here:
shadowed libs.zstd
Expand Down
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
</subpackage>

<subpackage name="authorizer">
<allow pkg="org.apache.commons.collections4" />
</subpackage>

</subpackage>

<subpackage name="shell">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.server.authorizer;

import org.apache.commons.collections4.trie.PatriciaTrie;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
Expand All @@ -32,12 +33,7 @@
import org.apache.kafka.common.utils.SecurityUtils;

import java.io.Closeable;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CompletionStage;

/**
Expand Down Expand Up @@ -200,16 +196,11 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL),
AclBindingFilter aclFilter = new AclBindingFilter(
resourceTypeFilter, AccessControlEntryFilter.ANY);

EnumMap<PatternType, Set<String>> denyPatterns =
new EnumMap<>(PatternType.class) {{
put(PatternType.LITERAL, new HashSet<>());
put(PatternType.PREFIXED, new HashSet<>());
}};
EnumMap<PatternType, Set<String>> allowPatterns =
new EnumMap<>(PatternType.class) {{
put(PatternType.LITERAL, new HashSet<>());
put(PatternType.PREFIXED, new HashSet<>());
}};
Set<String> denyPatternsLiteral = new HashSet<>();
PatriciaTrie<Boolean> denyPatternsPrefixed = new PatriciaTrie<>();

Set<String> allowPatternsLiteral = new HashSet<>();
PatriciaTrie<Boolean> allowPatternsPrefixed = new PatriciaTrie<>();

boolean hasWildCardAllow = false;

Expand All @@ -236,10 +227,10 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL),
// If wildcard deny exists, return deny directly
if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
return AuthorizationResult.DENIED;
denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
denyPatternsLiteral.add(binding.pattern().name());
break;
case PREFIXED:
denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
denyPatternsPrefixed.put(binding.pattern().name(), Boolean.TRUE);
break;
default:
}
Expand All @@ -255,10 +246,10 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL),
hasWildCardAllow = true;
continue;
}
allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
allowPatternsLiteral.add(binding.pattern().name());
break;
case PREFIXED:
allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
allowPatternsPrefixed.put(binding.pattern().name(), Boolean.TRUE);
break;
default:
}
Expand All @@ -269,27 +260,38 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL),
}

// For any literal allowed, if there's no dominant literal and prefix denied, return allow.
for (String allowStr : allowPatternsLiteral) {
if (denyPatternsLiteral.contains(allowStr)) {
continue;
}

boolean hasDominatedDeny = false;

if(!denyPatternsPrefixed.isEmpty()) {
hasDominatedDeny = !denyPatternsPrefixed.headMap(allowStr).isEmpty() || denyPatternsPrefixed.containsKey(allowStr);
}

// String denyPrefix = denyPatternsPrefixed.selectKey(allowStr);
// boolean hasDominatedDeny = (denyPrefix != null);

// boolean hasDominatedDeny = !denyPatternsPrefixed.prefixMap(allowStr).isEmpty();

if (!hasDominatedDeny)
return AuthorizationResult.ALLOWED;
}

// For any prefix allowed, if there's no dominant prefix denied, return allow.
for (Map.Entry<PatternType, Set<String>> entry : allowPatterns.entrySet()) {
for (String allowStr : entry.getValue()) {
if (entry.getKey() == PatternType.LITERAL
&& denyPatterns.get(PatternType.LITERAL).contains(allowStr))
continue;
StringBuilder sb = new StringBuilder();
boolean hasDominatedDeny = false;
for (char ch : allowStr.toCharArray()) {
sb.append(ch);
if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) {
hasDominatedDeny = true;
break;
}
}
if (!hasDominatedDeny)
return AuthorizationResult.ALLOWED;
for (String allowStr : allowPatternsPrefixed.keySet()) {
boolean hasDominatedDeny = false;

if(!denyPatternsPrefixed.isEmpty()) {
hasDominatedDeny = !denyPatternsPrefixed.headMap(allowStr).isEmpty() || denyPatternsPrefixed.containsKey(allowStr);
}

if (!hasDominatedDeny)
return AuthorizationResult.ALLOWED;
}

return AuthorizationResult.DENIED;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,26 @@ trait BaseAuthorizerTest {
"User1 from host1 should not have WRITE access to any topic")
}

@Test
def testAuthorizeByResourceTypeLiteralResourceDenyDominate(): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
val host1 = InetAddress.getByName("192.168.1.1")
val abcd = new ResourcePattern(GROUP, "abcd", PREFIXED)
val abcde = new ResourcePattern(GROUP, "abcde", LITERAL)

val u1h1Context = newRequestContext(user1, host1)
val allowAce = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, ALLOW)
val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, DENY)

addAcls(authorizer, Set(allowAce), abcde)
assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP),
"User1 from host1 should have READ access to at least one group")

addAcls(authorizer, Set(denyAce), abcd)
assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP),
"User1 from host1 now should not have READ access to any group")
}

@Test
def testAuthorizeByResourceTypePrefixedResourceDenyDominate(): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ versions += [
caffeine: "3.2.0",
bndlib: "7.1.0",
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2",
commonsCollection: "4.5.0",
commonsLang: "3.18.0",
commonsValidator: "1.10.0",
classgraph: "4.8.179",
Expand Down Expand Up @@ -150,6 +151,7 @@ libs += [
bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib",
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsCollection: "org.apache.commons:commons-collections4:$versions.commonsCollection",
commonsLang: "org.apache.commons:commons-lang3:$versions.commonsLang",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -51,6 +53,12 @@
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.profile.GCProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;

import java.io.IOException;
import java.net.InetAddress;
Expand All @@ -71,7 +79,7 @@
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class AuthorizerBenchmark {

@Param({"10000", "50000", "200000"})
@Param({"10000", "50000"})
private int resourceCount;
//no. of. rules per resource
@Param({"10", "50"})
Expand Down Expand Up @@ -197,6 +205,11 @@ private Boolean shouldDeny() {
return rand.nextDouble() * 100.0 - eps < denyPercentage;
}

@Setup(Level.Iteration)
public void setupIteration() {
authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), new HashMap<>(1000000)));
}

@TearDown(Level.Trial)
public void tearDown() throws IOException {
authorizer.close();
Expand Down