diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java
index 6539fb38fbe9b..7299eaee654ac 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataCommand.java
@@ -28,9 +28,22 @@ public class AuthenticationDataCommand implements AuthenticationDataSource {
protected final String authData;
protected final SocketAddress remoteAddress;
protected final SSLSession sslSession;
+ protected String subscription;
public AuthenticationDataCommand(String authData) {
- this(authData, null, null);
+ this(authData, null, null, null);
+ }
+
+ public AuthenticationDataCommand(String authData, String subscription) {
+ this(authData, null, null, subscription);
+ }
+
+ public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession,
+ String subscription) {
+ this.authData = authData;
+ this.remoteAddress = remoteAddress;
+ this.sslSession = sslSession;
+ this.subscription = subscription;
}
public AuthenticationDataCommand(String authData, SocketAddress remoteAddress, SSLSession sslSession) {
@@ -85,4 +98,21 @@ public Certificate[] getTlsCertificates() {
}
}
+ /*
+ * Subscription
+ */
+ @Override
+ public boolean hasSubscription() {
+ return this.subscription != null;
+ }
+
+ @Override
+ public void setSubscription(String subscription) {
+ this.subscription = subscription;
+ }
+
+ @Override
+ public String getSubscription() {
+ return subscription;
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
index b72b99beaae9f..eb9ed2bc747e6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSource.java
@@ -127,4 +127,27 @@ default boolean hasDataFromPeer() {
default SocketAddress getPeerAddress() {
return null;
}
+
+ /**
+ * Check if subscription is defined available.
+ *
+ * @return true if this authentication data contain subscription
+ */
+ default boolean hasSubscription() {
+ return false;
+ }
+
+ /**
+ * Subscription name can be necessary for consumption
+ *
+ * @return a String
containing the subscription name
+ */
+ default String getSubscription() { return null; }
+
+ /**
+ * Subscription name can be necessary for consumption
+ *
+ * @return a String
containing the subscription name
+ */
+ default void setSubscription(String subscription) { };
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 572e4031d3271..c4fcc5ed860e8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -29,7 +30,14 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
/**
* Provider of authorization mechanism
@@ -186,4 +194,114 @@ CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName namespac
CompletableFuture grantPermissionAsync(TopicName topicName, Set actions, String role,
String authDataJson);
+ /**
+ * Grant authorization-action permission on a tenant to the given client
+ * @param tenantName
+ * @param originalRole role not overriden by proxy role if request do pass through proxy
+ * @param role originalRole | proxyRole if the request didn't pass through proxy
+ * @param operation
+ * @param authData
+ * @return CompletableFuture
+ */
+ default CompletableFuture allowTenantOperationAsync(String tenantName, String originalRole, String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("allowTenantOperation(%s) on tenant %s is not supported by the Authorization" +
+ " provider you are using.",
+ operation.toString(), tenantName)));
+ }
+
+ default Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation,
+ AuthenticationDataSource authData) {
+ try {
+ return allowTenantOperationAsync(tenantName, originalRole, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+ /**
+ * Grant authorization-action permission on a namespace to the given client
+ * @param namespaceName
+ * @param originalRole role not overriden by proxy role if request do pass through proxy
+ * @param role originalRole | proxyRole if the request didn't pass through proxy
+ * @param operation
+ * @param authData
+ * @return CompletableFuture
+ */
+ default CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
+ String role, NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("NamespaceOperation is not supported by the Authorization provider you are using."));
+ }
+
+ default Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role,
+ NamespaceOperation operation, AuthenticationDataSource authData) {
+ try {
+ return allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+ /**
+ * Grant authorization-action permission on a namespace to the given client
+ * @param namespaceName
+ * @param originalRole role not overriden by proxy role if request do pass through proxy
+ * @param role originalRole | proxyRole if the request didn't pass through proxy
+ * @param operation
+ * @param authData
+ * @return CompletableFuture
+ */
+ default CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+ PolicyOperation operation, String originalRole,
+ String role, AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("NamespacePolicyOperation is not supported by the Authorization provider you are using."));
+ }
+
+ default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation,
+ String originalRole, String role, AuthenticationDataSource authData) {
+ try {
+ return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+
+ /**
+ * Grant authorization-action permission on a topic to the given client
+ * @param topic
+ * @param originalRole role not overriden by proxy role if request do pass through proxy
+ * @param role originalRole | proxyRole if the request didn't pass through proxy
+ * @param operation
+ * @param authData
+ * @return CompletableFuture
+ */
+ default CompletableFuture allowTopicOperationAsync(TopicName topic, String originalRole, String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("TopicOperation is not supported by the Authorization provider you are using."));
+ }
+
+ default Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation,
+ AuthenticationDataSource authData) {
+ try {
+ return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 3bf4458a7c1ba..e92ab8400addb 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -21,12 +21,19 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -318,4 +325,170 @@ public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceN
AuthenticationDataSource authenticationData) {
return provider.allowFunctionOpsAsync(namespaceName, role, authenticationData);
}
+
+ /**
+ * Grant authorization-action permission on a tenant to the given client
+ *
+ * @param tenantName
+ * @param operation
+ * @param originalRole
+ * @param role
+ * @param authData
+ * additional authdata in json for targeted authorization provider
+ * @return IllegalArgumentException when tenant not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture allowTenantOperationAsync(String tenantName, TenantOperation operation,
+ String originalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.allowTenantOperationAsync(tenantName, originalRole, role, operation, authData);
+ }
+
+ return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+ "allowTenantOperationAsync"));
+ }
+
+ public Boolean allowTenantOperation(String tenantName, TenantOperation operation, String orignalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return true;
+ }
+
+ if (provider != null) {
+ return provider.allowTenantOperation(tenantName, orignalRole, role, operation, authData);
+ }
+
+ throw new IllegalStateException("No authorization provider configured for allowTenantOperation");
+ }
+
+ /**
+ * Grant authorization-action permission on a namespace to the given client
+ *
+ * @param namespaceName
+ * @param operation
+ * @param originalRole
+ * @param role
+ * @param authData
+ * additional authdata in json for targeted authorization provider
+ * @return IllegalArgumentException when namespace not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName,
+ NamespaceOperation operation,
+ String originalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData);
+ }
+
+ return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+ "allowNamespaceOperationAsync"));
+ }
+
+ public Boolean allowNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation,
+ String originalPrincipal, String role, AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return true;
+ }
+
+ if (provider != null) {
+ return provider.allowNamespaceOperation(namespaceName, originalPrincipal, role, operation, authData);
+ }
+
+ throw new IllegalStateException("No authorization provider configured for allowNamespaceOperation");
+ }
+
+ /**
+ * Grant authorization-action permission on a namespace to the given client
+ *
+ * @param namespaceName
+ * @param operation
+ * @param originalRole
+ * @param role
+ * @param authData
+ * additional authdata in json for targeted authorization provider
+ * @return IllegalArgumentException when namespace not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+ PolicyOperation operation, String originalRole,
+ String role, AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData);
+ }
+
+ return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+ "allowNamespacePolicyOperationAsync"));
+ }
+
+ public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy,
+ PolicyOperation operation, String originalPrincipal, String role,
+ AuthenticationDataHttps authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return true;
+ }
+
+ if (provider != null) {
+ return provider.allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal, role, authData);
+ }
+
+ throw new IllegalStateException("No authorization provider configured for allowNamespacePolicyOperation");
+ }
+
+ /**
+ * Grant authorization-action permission on a topic to the given client
+ *
+ * @param topicName
+ * @param operation
+ * @param role
+ * @param authData
+ * additional authdata in json for targeted authorization provider
+ * @return IllegalArgumentException when namespace not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture allowTopicOperationAsync(TopicName topicName, TopicOperation operation,
+ String originalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.allowTopicOperationAsync(topicName, originalRole, role, operation, authData);
+ }
+
+ return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " +
+ "allowTopicOperationAsync"));
+ }
+
+ public Boolean allowTopicOperation(TopicName topicName, TopicOperation operation,
+ String orignalRole, String role,
+ AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return true;
+ }
+
+ if (provider != null) {
+ return provider.allowTopicOperation(topicName, orignalRole, role, operation, authData);
+ }
+
+ throw new IllegalStateException("No authorization provider configured for allowTopicOperation");
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index b025f806f8636..40b20217d4200 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -27,8 +27,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -36,8 +37,17 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.Policies;
import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
+
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@@ -46,7 +56,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
+import javax.ws.rs.core.Response;
/**
* Default authorization provider that stores authorization policies under local-zookeeper.
@@ -501,4 +511,115 @@ private void validatePoliciesReadOnlyAccess() {
}
}
+ @Override
+ public CompletableFuture allowTenantOperationAsync(String tenantName, String originalRole, String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(tenantName, originalRole, role, authData);
+ }
+
+ @Override
+ public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
+ String role, NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+ }
+
+ @Override
+ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+ PolicyOperation operation, String originalRole,
+ String role, AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData);
+ }
+
+ @Override
+ public CompletableFuture allowTopicOperationAsync(TopicName topicName, String originalRole, String role,
+ TopicOperation operation,
+ AuthenticationDataSource authData) {
+ CompletableFuture isAuthorizedFuture;
+
+ switch (operation) {
+ case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, role, authData);
+ break;
+ case PRODUCE: isAuthorizedFuture= canProduceAsync(topicName, role, authData);
+ break;
+ case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription());
+ break;
+ default: isAuthorizedFuture = FutureUtil.failedFuture(
+ new IllegalStateException("TopicOperation is not supported."));
+ }
+
+ CompletableFuture isSuperUserFuture = isSuperUser(role, conf);
+
+ return isSuperUserFuture
+ .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> isSuperUser || isAuthorized);
+ }
+
+ private static String path(String... parts) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("/admin/");
+ Joiner.on('/').appendTo(sb, parts);
+ return sb.toString();
+ }
+
+ private CompletableFuture validateTenantAdminAccess(String tenantName, String originalRole, String role,
+ AuthenticationDataSource authData) {
+ try {
+ TenantInfo tenantInfo = configCache.propertiesCache()
+ .get(path(POLICIES, tenantName))
+ .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
+
+ validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+
+ if (role != null && conf.getProxyRoles().contains(role)) {
+ // role check
+ CompletableFuture isRoleSuperUserFuture = isSuperUser(role, conf);
+ CompletableFuture isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
+ CompletableFuture isRoleAuthorizedFuture = isRoleSuperUserFuture
+ .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
+ isRoleSuperUser || isRoleTenantAdmin);
+
+ // originalRole check
+ CompletableFuture isOriginalRoleSuperUserFuture = isSuperUser(originalRole, conf);
+ CompletableFuture isOriginalRoleTenantAdminFuture = isTenantAdmin(tenantName, originalRole,
+ tenantInfo, authData);
+ CompletableFuture isOriginalRoleAuthorizedFuture = isOriginalRoleSuperUserFuture
+ .thenCombine(isOriginalRoleTenantAdminFuture, (isOriginalRoleSuperUser, isOriginalRoleTenantAdmin) ->
+ isOriginalRoleSuperUser || isOriginalRoleTenantAdmin);
+
+ // merging
+ return isRoleAuthorizedFuture
+ .thenCombine(isOriginalRoleAuthorizedFuture, (isRoleAuthorized, isOriginalRoleAuthorized) ->
+ isRoleAuthorized && isOriginalRoleAuthorized);
+ } else {
+ // role check
+ CompletableFuture isRoleSuperUserFuture = isSuperUser(role, conf);
+ CompletableFuture isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
+ return isRoleSuperUserFuture
+ .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
+ isRoleSuperUser || isRoleTenantAdmin);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("Failed to get tenant info data for non existing tenant {}", tenantName);
+ throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
+ } catch (Exception e) {
+ log.error("Failed to get tenant {}", tenantName, e);
+ throw new RestException(e);
+ }
+ }
+
+ private static void validateOriginalPrincipal(Set proxyRoles, String authenticatedPrincipal,
+ String originalPrincipal) {
+ if (proxyRoles.contains(authenticatedPrincipal)) {
+ // Request has come from a proxy
+ if (StringUtils.isBlank(originalPrincipal)) {
+ log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
+ throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy.");
+ }
+ if (proxyRoles.contains(originalPrincipal)) {
+ log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
+ throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
+ }
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b0504eebde222..8a04adf3c3dff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -80,15 +80,19 @@
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.LocalPolicies;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.KeeperException;
@@ -103,7 +107,7 @@ public abstract class NamespacesBase extends AdminResource {
private static final long MAX_BUNDLES = ((long) 1) << 32;
protected List internalGetTenantNamespaces(String tenant) {
- validateAdminAccessForTenant(tenant);
+ validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
try {
return getListOfNamespaces(tenant);
@@ -117,9 +121,8 @@ protected List internalGetTenantNamespaces(String tenant) {
}
protected void internalCreateNamespace(Policies policies) {
+ validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
validatePoliciesReadOnlyAccess();
- validateAdminAccessForTenant(namespaceName.getTenant());
-
validatePolicies(namespaceName, policies);
try {
@@ -138,7 +141,7 @@ protected void internalCreateNamespace(Policies policies) {
@SuppressWarnings("deprecation")
protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
validatePoliciesReadOnlyAccess();
// ensure that non-global namespace is directed to the correct cluster
@@ -282,7 +285,7 @@ && getPartitionedTopicList(TopicDomain.persistent).isEmpty()
@SuppressWarnings("deprecation")
protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
validatePoliciesReadOnlyAccess();
// ensure that non-global namespace is directed to the correct cluster
@@ -353,7 +356,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori
}
protected void internalGrantPermissionOnNamespace(String role, Set actions) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -384,7 +387,7 @@ protected void internalGrantPermissionOnNamespace(String role, Set a
protected void internalGrantPermissionOnSubscription(String subscription, Set roles) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -414,7 +417,7 @@ protected void internalGrantPermissionOnSubscription(String subscription, Set internalGetNamespaceReplicationClusters() {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ);
+
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot get the replication clusters for a non-global namespace");
@@ -467,7 +472,7 @@ protected Set internalGetNamespaceReplicationClusters() {
}
protected void internalSetNamespaceReplicationClusters(List clusterIds) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Set replicationClusterSet = Sets.newHashSet(clusterIds);
@@ -525,7 +530,7 @@ protected void internalSetNamespaceReplicationClusters(List clusterIds)
}
protected void internalSetNamespaceMessageTTL(int messageTTL) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.TTL, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (messageTTL < 0) {
@@ -820,7 +825,7 @@ protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncRespons
}
protected void internalModifyDeduplication(boolean enableDeduplication) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Entry policiesNode = null;
@@ -856,9 +861,8 @@ protected void internalModifyDeduplication(boolean enableDeduplication) {
@SuppressWarnings("deprecation")
protected void internalUnloadNamespace(AsyncResponse asyncResponse) {
- log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName);
-
validateSuperUserAccess();
+ log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName);
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
@@ -903,11 +907,10 @@ protected void internalUnloadNamespace(AsyncResponse asyncResponse) {
protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffinityGroup) {
+ validateSuperUserAccess();
log.info("[{}] Setting bookie-affinity-group {} for namespace {}", clientAppId(), bookieAffinityGroup,
this.namespaceName);
- validateSuperUserAccess();
-
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
@@ -994,9 +997,9 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
@SuppressWarnings("deprecation")
public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritative) {
+ validateSuperUserAccess();
log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
- validateSuperUserAccess();
Policies policies = getNamespacePolicies(namespaceName);
NamespaceBundle bundle = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName.toString(), bundleRange);
@@ -1042,9 +1045,9 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat
@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
+ validateSuperUserAccess();
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
- validateSuperUserAccess();
Policies policies = getNamespacePolicies(namespaceName);
if (namespaceName.isGlobal()) {
@@ -1095,8 +1098,8 @@ private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(Str
}
protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
- log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
Entry policiesNode = null;
@@ -1132,7 +1135,7 @@ protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
}
protected PublishRate internalGetPublishRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
PublishRate publishRate = policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
@@ -1146,8 +1149,8 @@ protected PublishRate internalGetPublishRate() {
@SuppressWarnings("deprecation")
protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
- log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
Entry policiesNode = null;
@@ -1185,7 +1188,7 @@ protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
@SuppressWarnings("deprecation")
protected DispatchRate internalGetTopicDispatchRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
@@ -1201,8 +1204,8 @@ protected DispatchRate internalGetTopicDispatchRate() {
}
protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
- log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
Entry policiesNode = null;
@@ -1238,7 +1241,7 @@ protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
}
protected DispatchRate internalGetSubscriptionDispatchRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
@@ -1251,9 +1254,10 @@ protected DispatchRate internalGetSubscriptionDispatchRate() {
}
protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
- log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
+
Entry policiesNode = null;
try {
@@ -1288,7 +1292,7 @@ protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
}
protected SubscribeRate internalGetSubscribeRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
SubscribeRate subscribeRate = policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
if (subscribeRate != null) {
@@ -1300,8 +1304,8 @@ protected SubscribeRate internalGetSubscribeRate() {
}
protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
- log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();
+ log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
Entry policiesNode = null;
@@ -1337,7 +1341,7 @@ protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
}
protected DispatchRate internalGetReplicatorDispatchRate() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
@@ -1350,7 +1354,7 @@ protected DispatchRate internalGetReplicatorDispatchRate() {
}
protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (backlogQuotaType == null) {
@@ -1397,7 +1401,7 @@ protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, Backlo
}
protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (backlogQuotaType == null) {
@@ -1430,7 +1434,7 @@ protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) {
}
protected void internalSetRetention(RetentionPolicies retention) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -1468,7 +1472,7 @@ protected void internalSetRetention(RetentionPolicies retention) {
}
protected void internalSetPersistence(PersistencePolicies persistence) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
validatePersistencePolicies(persistence);
@@ -1499,7 +1503,7 @@ protected void internalSetPersistence(PersistencePolicies persistence) {
}
protected PersistencePolicies internalGetPersistence() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.persistence == null) {
@@ -1511,7 +1515,7 @@ protected PersistencePolicies internalGetPersistence() {
}
protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
final List> futures = Lists.newArrayList();
try {
@@ -1553,7 +1557,7 @@ protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolea
@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
Policies policies = getNamespacePolicies(namespaceName);
@@ -1574,7 +1578,7 @@ protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean a
protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
final List> futures = Lists.newArrayList();
try {
@@ -1617,7 +1621,7 @@ protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncR
@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
Policies policies = getNamespacePolicies(namespaceName);
@@ -1638,7 +1642,7 @@ protected void internalClearNamespaceBundleBacklogForSubscription(String subscri
protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
final List> futures = Lists.newArrayList();
try {
@@ -1680,7 +1684,7 @@ protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String
@SuppressWarnings("deprecation")
protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
Policies policies = getNamespacePolicies(namespaceName);
@@ -1700,7 +1704,7 @@ protected void internalUnsubscribeNamespaceBundle(String subscription, String bu
}
protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscriptionAuthMode) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (subscriptionAuthMode == null) {
@@ -1736,7 +1740,7 @@ protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscription
}
protected void internalModifyEncryptionRequired(boolean encryptionRequired) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Entry policiesNode = null;
@@ -1772,7 +1776,7 @@ protected void internalModifyEncryptionRequired(boolean encryptionRequired) {
}
protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.delayed_delivery_policies == null) {
@@ -1785,7 +1789,6 @@ protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
validateSuperUserAccess();
- validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
try {
@@ -1818,7 +1821,7 @@ protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliver
}
protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName);
@@ -1859,12 +1862,12 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
}
protected String internalGetNamespaceAntiAffinityGroup() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).antiAffinityGroup;
}
protected void internalRemoveNamespaceAntiAffinityGroup() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName);
@@ -1895,7 +1898,7 @@ protected void internalRemoveNamespaceAntiAffinityGroup() {
protected List internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
String tenant) {
- validateAdminAccessForTenant(tenant);
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster);
@@ -1947,7 +1950,7 @@ private void validatePersistencePolicies(PersistencePolicies persistence) {
}
protected RetentionPolicies internalGetRetention() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.retention_policies == null) {
@@ -2140,12 +2143,12 @@ private void validatePolicies(NamespaceName ns, Policies policies) {
protected int internalGetMaxProducersPerTopic() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_producers_per_topic;
}
protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2181,12 +2184,12 @@ protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
}
protected int internalGetMaxConsumersPerTopic() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_consumers_per_topic;
}
protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2222,12 +2225,12 @@ protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) {
}
protected int internalGetMaxConsumersPerSubscription() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_consumers_per_subscription;
}
protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2263,12 +2266,12 @@ protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscri
}
protected int internalGetMaxUnackedMessagesPerConsumer() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer;
}
protected void internalSetMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2304,12 +2307,12 @@ protected void internalSetMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPe
}
protected int internalGetMaxUnackedMessagesPerSubscription() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription;
}
protected void internalSetMaxUnackedMessagesPerSubscription(int maxUnackedMessagesPerSubscription) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2345,12 +2348,12 @@ protected void internalSetMaxUnackedMessagesPerSubscription(int maxUnackedMessag
}
protected long internalGetCompactionThreshold() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).compaction_threshold;
}
protected void internalSetCompactionThreshold(long newThreshold) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2386,7 +2389,7 @@ protected void internalSetCompactionThreshold(long newThreshold) {
}
protected long internalGetOffloadThreshold() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.offload_policies == null) {
return policies.offload_threshold;
@@ -2396,7 +2399,7 @@ protected long internalGetOffloadThreshold() {
}
protected void internalSetOffloadThreshold(long newThreshold) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2433,7 +2436,7 @@ protected void internalSetOffloadThreshold(long newThreshold) {
}
protected Long internalGetOffloadDeletionLag() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
if (policies.offload_policies == null) {
return policies.offload_deletion_lag_ms;
@@ -2443,7 +2446,7 @@ protected Long internalGetOffloadDeletionLag() {
}
protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
@@ -2481,12 +2484,12 @@ protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
@Deprecated
protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
}
protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED){
@@ -2498,7 +2501,7 @@ protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
@Deprecated
protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2509,7 +2512,7 @@ protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdate
}
protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2520,13 +2523,12 @@ protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrateg
}
protected boolean internalGetSchemaValidationEnforced() {
- validateSuperUserAccess();
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).schema_validation_enforced;
}
protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2537,13 +2539,12 @@ protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnfor
}
protected boolean internalGetIsAllowAutoUpdateSchema() {
- validateSuperUserAccess();
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).is_allow_auto_update_schema;
}
protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema) {
- validateSuperUserAccess();
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
mutatePolicy((policies) -> {
@@ -2586,7 +2587,7 @@ private void mutatePolicy(Function policyTransformation,
}
protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPolicies offloadPolicies) {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
validateOffloadPolicies(offloadPolicies);
@@ -2663,7 +2664,7 @@ private void validateOffloadPolicies(OffloadPolicies offloadPolicies) {
}
protected OffloadPolicies internalGetOffloadPolicies() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.offload_policies;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 700381d7ebf71..c9e6defdf3fe1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -27,6 +27,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -35,12 +36,16 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,7 +94,7 @@ public List getTenantNamespaces(@PathParam("property") String property)
@ApiResponse(code = 404, message = "Property or cluster doesn't exist") })
public List getNamespacesForCluster(@PathParam("property") String property,
@PathParam("cluster") String cluster) {
- validateAdminAccessForTenant(property);
+ validateTenantOperation(property, TenantOperation.LIST_NAMESPACES);
List namespaces = Lists.newArrayList();
if (!clusters().contains(cluster)) {
log.warn("[{}] Failed to get namespace list for property: {}/{} - Cluster does not exist", clientAppId(),
@@ -121,16 +126,14 @@ public void getTopics(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
- .thenAccept(topics -> {
- asyncResponse.resume(topics);
- })
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
asyncResponse.resume(ex);
@@ -145,8 +148,8 @@ public void getTopics(@PathParam("property") String property,
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public Policies getPolicies(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.ALL, PolicyOperation.READ);
return getNamespacePolicies(namespaceName);
}
@@ -228,8 +231,8 @@ public void deleteNamespaceBundle(@PathParam("property") String property,
@ApiResponse(code = 409, message = "Namespace is not empty") })
public Map> getPermissions(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION);
Policies policies = getNamespacePolicies(namespaceName);
return policies.auth_policies.namespace_auth;
@@ -294,8 +297,8 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert
@ApiResponse(code = 412, message = "Namespace is not global") })
public Set getNamespaceReplicationClusters(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.REPLICATION, PolicyOperation.READ);
return internalGetNamespaceReplicationClusters();
}
@@ -320,8 +323,8 @@ public void setNamespaceReplicationClusters(@PathParam("property") String proper
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public int getNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.TTL, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.message_ttl_in_seconds;
@@ -505,9 +508,9 @@ public void removeAutoSubscriptionCreation(@Suspended final AsyncResponse asyncR
@ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
public BundlesData getBundlesData(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validatePoliciesReadOnlyAccess();
validateNamespaceName(property, cluster, namespace);
+ validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_BUNDLE);
Policies policies = getNamespacePolicies(namespaceName);
@@ -638,8 +641,8 @@ public DispatchRate getSubscriptionDispatchRate(@PathParam("property") String pr
@ApiResponse(code = 404, message = "Namespace does not exist") })
public Map getBacklogQuotaMap(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.BACKLOG, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.backlog_quota_map;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 95cf65bef6e99..4f123a77f1789 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,6 +46,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -53,9 +54,12 @@
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -64,6 +68,7 @@
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.TenantOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,16 +96,14 @@ public void getTopics(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
+ validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
- .thenAccept(topics -> {
- asyncResponse.resume(topics);
- })
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
asyncResponse.resume(ex);
@@ -114,8 +117,8 @@ public void getTopics(@PathParam("tenant") String tenant,
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public Policies getPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.ALL, PolicyOperation.READ);
return getNamespacePolicies(namespaceName);
}
@@ -129,7 +132,7 @@ public Policies getPolicies(@PathParam("tenant") String tenant, @PathParam("name
public void createNamespace(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
Policies policies) {
validateNamespaceName(tenant, namespace);
-
+ validateTenantOperation(tenant, TenantOperation.CREATE_NAMESPACE);
policies = getDefaultPolicesIfNull(policies);
internalCreateNamespace(policies);
}
@@ -178,8 +181,8 @@ public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam
@ApiResponse(code = 409, message = "Namespace is not empty") })
public Map> getPermissions(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
+ validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_PERMISSION);
Policies policies = getNamespacePolicies(namespaceName);
return policies.auth_policies.namespace_auth;
@@ -244,9 +247,8 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert
@ApiResponse(code = 412, message = "Namespace is not global") })
public Set getNamespaceReplicationClusters(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
-
+ validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.REPLICATION, PolicyOperation.READ);
return internalGetNamespaceReplicationClusters();
}
@@ -270,9 +272,8 @@ public void setNamespaceReplicationClusters(@PathParam("tenant") String tenant,
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public int getNamespaceMessageTTL(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
-
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.TTL, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.message_ttl_in_seconds;
@@ -410,9 +411,9 @@ public void removeAutoSubscriptionCreation(@Suspended final AsyncResponse asyncR
@ApiResponse(code = 412, message = "Namespace is not setup to split in bundles") })
public BundlesData getBundlesData(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validatePoliciesReadOnlyAccess();
validateNamespaceName(tenant, namespace);
+ validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_BUNDLE);
Policies policies = getNamespacePolicies(namespaceName);
@@ -584,9 +585,8 @@ public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant
@ApiResponse(code = 404, message = "Namespace does not exist") })
public Map getBacklogQuotaMap(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
-
+ validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.BACKLOG, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.backlog_quota_map;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1eefc37cdb98d..bc04f333b0335 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -59,6 +59,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -77,6 +78,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
@@ -292,8 +294,8 @@ protected void handleLookup(CommandLookupTopic lookup) {
}
CompletableFuture isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService().canLookupAsync(topicName, authRole,
- authenticationData);
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -364,8 +366,8 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
}
CompletableFuture isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService()
- .canLookupAsync(topicName, authRole, authenticationData);
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -760,8 +762,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
CompletableFuture isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService().canConsumeAsync(topicName, authRole,
- authenticationData, subscribe.getSubscription());
+ authenticationData.setSubscription(subscriptionName);
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -769,9 +772,13 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (isProxyAuthorized) {
CompletableFuture authorizationFuture;
if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
- originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
- subscriptionName);
+ if (authenticationData == null) {
+ authenticationData = new AuthenticationDataCommand("", subscriptionName);
+ } else {
+ authenticationData.setSubscription(subscriptionName);
+ }
+ authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
@@ -979,8 +986,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
CompletableFuture isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationService().canProduceAsync(topicName,
- authRole, authenticationData);
+ isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -988,8 +995,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
if (isProxyAuthorized) {
CompletableFuture authorizationFuture;
if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationService().canProduceAsync(topicName,
- originalPrincipal != null ? originalPrincipal : authRole, authenticationData);
+ authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 2273d1d6751d0..96569e37900f1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -61,8 +61,13 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -771,4 +776,69 @@ protected static boolean isLeaderBroker(PulsarService pulsar) {
// Non-Usual HTTP error codes
protected static final int NOT_IMPLEMENTED = 501;
+ public void validateTenantOperation(String tenant, TenantOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
+ }
+
+ Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowTenantOperation(
+ tenant, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+ if (!isAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTenantOperation for" +
+ " originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]",
+ originalPrincipal(), clientAppId(), operation.toString(), tenant));
+ }
+ }
+ }
+
+ public void validateNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+ }
+
+ Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowNamespaceOperation(namespaceName, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+ if (!isAuthorized) {
+ throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespaceOperation for" +
+ " operation [%s] on namespace [%s]", operation.toString(), namespaceName));
+ }
+ }
+ }
+
+ public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
+ }
+
+ Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+ if (!isAuthorized) {
+ throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespacePolicyOperation for" +
+ " operation [%s] on namespace [%s] on policy [%s]", operation.toString(), namespaceName, policy.toString()));
+ }
+ }
+ }
+
+ public void validateTopicOperation(TopicName topicName, TopicOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
+ }
+
+ Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
+ .allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), clientAuthData());
+
+ if (!isAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for" +
+ " operation [%s] on topic [%s]", operation.toString(), topicName));
+ }
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index f98758cc60222..2df24bb2e4358 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -81,9 +81,12 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -154,9 +157,6 @@ public void setup() throws Exception {
doReturn(null).when(namespaces).originalPrincipal();
doReturn(null).when(namespaces).clientAuthData();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
- doNothing().when(namespaces).validateAdminAccessForTenant(this.testTenant);
- doNothing().when(namespaces).validateAdminAccessForTenant("non-existing-tenant");
- doNothing().when(namespaces).validateAdminAccessForTenant("new-property");
admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:8080"));
admin.clusters().createCluster("usw", new ClusterData("http://broker-usw.com:8080"));
@@ -171,7 +171,15 @@ public void setup() throws Exception {
new BundlesData());
doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
- .validateAdminAccessForTenant(this.testOtherTenant);
+ .validateTenantOperation(this.testOtherTenant, null);
+
+ doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+ .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+
+ doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+ .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.REPLICATION, PolicyOperation.WRITE);
nsSvc = pulsar.getNamespaceService();
}
@@ -878,7 +886,6 @@ private void createTestNamespaces(List nsnames, BundlesData bundl
@Test
public void testValidateAdminAccessOnTenant() throws Exception {
-
try {
final String property = "prop";
pulsar.getConfiguration().setAuthenticationEnabled(true);
@@ -888,7 +895,8 @@ public void testValidateAdminAccessOnTenant() throws Exception {
new TenantInfo(Sets.newHashSet(namespaces.clientAppId()), Sets.newHashSet("use")));
ZkUtils.createFullPathOptimistic(pulsar.getConfigurationCache().getZooKeeper(), path, data.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- namespaces.validateAdminAccessForTenant(property);
+
+ namespaces.validateTenantOperation(property, null);
} finally {
pulsar.getConfiguration().setAuthenticationEnabled(false);
pulsar.getConfiguration().setAuthorizationEnabled(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index cb913fb9348b3..aa718010ad583 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -478,8 +478,8 @@ public void testProducerOnNotOwnedTopic() throws Exception {
@Test(timeOut = 30000)
public void testProducerCommandWithAuthorizationPositive() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
- doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canProduceAsync(Mockito.any(),
- Mockito.any(), Mockito.any());
+ doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
resetChannel();
@@ -605,8 +605,8 @@ public void testNonExistentTopicSuperUserAccess() throws Exception {
public void testProducerCommandWithAuthorizationNegative() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
- doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canProduceAsync(Mockito.any(),
- Mockito.any(), Mockito.any());
+ doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1195,8 +1195,8 @@ public void testUnsupportedBatchMsgSubscribeCommand() throws Exception {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
- doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canConsumeAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any());
+ doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1217,8 +1217,8 @@ public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
- doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canConsumeAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any());
+ doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 7b324c0ba2b79..ddeed965eb523 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
@@ -49,7 +50,11 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -487,6 +492,41 @@ public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName n
String subscriptionName, String role, String authDataJson) {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture allowTenantOperationAsync(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ return true;
+ }
}
/**
@@ -515,21 +555,32 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol
}
public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
+ @Override
+ public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ try {
+ return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
@Override
- public CompletableFuture canConsumeAsync(TopicName topicName, String role,
- AuthenticationDataSource authenticationData, String subscription) {
+ public CompletableFuture allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) {
CompletableFuture future = new CompletableFuture<>();
- if (isNotBlank(subscription)) {
- if (!subscription.startsWith(role)) {
- future.completeExceptionally(new PulsarServerException(
- "The subscription name needs to be prefixed by the authentication role"));
+ if (authData.hasSubscription()) {
+ String subscription = authData.getSubscription();
+ if (isNotBlank(subscription)) {
+ if (!subscription.startsWith(role)) {
+ future.completeExceptionally(new PulsarServerException(
+ "The subscription name needs to be prefixed by the authentication role"));
+ }
}
}
future.complete(clientRole.equals(role));
return future;
}
-
}
public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
index cc3cb3ba3ed66..ec9d4e4f0bd94 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
@@ -61,6 +61,8 @@ public class DiscoveryServiceWebTest extends ProducerConsumerBase {
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
+ super.conf.setAuthorizationEnabled(true);
+ super.conf.setAuthenticationEnabled(true);
}
@AfterMethod
@@ -99,9 +101,9 @@ public void testRedirectUrlWithServerStarted() throws Exception {
**/
assertEquals(hitBrokerService(HttpMethod.POST, postRequestUrl, Lists.newArrayList("use")),
- "Tenant does not exist");
- assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Tenant does not exist");
- assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Tenant does not exist");
+ "Need to authenticate to perform the request");
+ assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Need to authenticate to perform the request");
+ assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Need to authenticate to perform the request");
server.stop();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
new file mode 100644
index 0000000000000..bda93c4d9a03d
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Namespace authorization operations.
+ */
+public enum NamespaceOperation {
+ CREATE_TOPIC,
+ GET_TOPIC,
+ GET_TOPICS,
+ DELETE_TOPIC,
+
+ ADD_BUNDLE,
+ DELETE_BUNDLE,
+ GET_BUNDLE,
+
+ GET_PERMISSION,
+ GRANT_PERMISSION,
+ REVOKE_PERMISSION,
+
+ CLEAR_BACKLOG,
+ UNSUBSCRIBE,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
new file mode 100644
index 0000000000000..439ed7b82c9ec
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * PolicyName authorization operations.
+ */
+public enum PolicyName {
+ ALL,
+ ANTI_AFFINITY,
+ BACKLOG,
+ COMPACTION,
+ DELAYED_DELIVERY,
+ DEDUPLICATION,
+ MAX_CONSUMERS,
+ MAX_PRODUCERS,
+ MAX_UNACKED,
+ OFFLOAD,
+ PERSISTENCE,
+ RATE,
+ RETENTION,
+ REPLICATION,
+ REPLICATION_RATE,
+ SCHEMA_COMPATIBILITY_STRATEGY,
+ SUBSCRIPTION_AUTH_MODE,
+ ENCRYPTION,
+ TTL,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyOperation.java
new file mode 100644
index 0000000000000..ce70341a71604
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyOperation.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * PolicyOperation authorization operations.
+ */
+public enum PolicyOperation {
+ READ,
+ WRITE,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java
new file mode 100644
index 0000000000000..b444433f6fa90
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Tenant authorization operations.
+ */
+public enum TenantOperation {
+ CREATE_NAMESPACE,
+ DELETE_NAMESPACE,
+ LIST_NAMESPACES,
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
new file mode 100644
index 0000000000000..7e54cca1515d3
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Topic authorization operations.
+ */
+public enum TopicOperation {
+ LOOKUP,
+ PRODUCE,
+ CONSUME,
+
+ COMPACT,
+ EXPIRE_MESSAGES,
+ OFFLOAD,
+ PEEK_MESSAGES,
+ RESET_CURSOR,
+ SKIP,
+ TERMINATE,
+ UNLOAD,
+
+ GRANT_PERMISSION,
+ GET_PERMISSION,
+ REVOKE_PERMISSION,
+
+ ADD_BUNDLE_RANGE,
+ GET_BUNDLE_RANGE,
+ DELETE_BUNDLE_RANGE,
+
+ SUBSCRIBE,
+ GET_SUBSCRIPTIONS,
+ UNSUBSCRIBE,
+}