Skip to content

Commit

Permalink
[improve][broker] Require authRole is proxyRole to set originalPrinci…
Browse files Browse the repository at this point in the history
…pal (apache#19455)

Co-authored-by: Lari Hotari <lhotari@apache.org>
(cherry picked from commit aa63a55)
  • Loading branch information
michaeljmarshall committed Feb 16, 2023
1 parent 6132b46 commit 7b2a14c
Show file tree
Hide file tree
Showing 20 changed files with 160 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.pulsar.broker.authorization;

import static java.util.concurrent.TimeUnit.SECONDS;
import java.net.SocketAddress;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -37,7 +37,6 @@
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -293,19 +292,39 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return provider.allowSinkOpsAsync(namespaceName, role, authenticationData);
}

private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
String originalPrincipal) {
if (proxyRoles.contains(authenticatedPrincipal)) {
// Request has come from a proxy
public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
String originalPrincipal,
AuthenticationDataSource authDataSource) {
SocketAddress remoteAddress = authDataSource != null ? authDataSource.getPeerAddress() : null;
return isValidOriginalPrincipal(authenticatedPrincipal, originalPrincipal, remoteAddress);
}

/**
* Validates that the authenticatedPrincipal and the originalPrincipal are a valid combination.
* Valid combinations fulfill the following rule: the authenticatedPrincipal is in
* {@link ServiceConfiguration#getProxyRoles()}, if, and only if, the originalPrincipal is set to a role
* that is not also in {@link ServiceConfiguration#getProxyRoles()}.
* @return true when roles are a valid combination and false when roles are an invalid combination
*/
public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
String originalPrincipal,
SocketAddress remoteAddress) {
String errorMsg = null;
if (conf.getProxyRoles().contains(authenticatedPrincipal)) {
if (StringUtils.isBlank(originalPrincipal)) {
log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the "
+ "request is via proxy.");
}
if (proxyRoles.contains(originalPrincipal)) {
log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
errorMsg = "originalPrincipal must be provided when connecting with a proxy role.";
} else if (conf.getProxyRoles().contains(originalPrincipal)) {
errorMsg = "originalPrincipal cannot be a proxy role.";
}
} else if (StringUtils.isNotBlank(originalPrincipal)) {
errorMsg = "cannot specify originalPrincipal when connecting without valid proxy role.";
}
if (errorMsg != null) {
log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress,
authenticatedPrincipal, originalPrincipal, errorMsg);
return false;
} else {
return true;
}
}

Expand Down Expand Up @@ -340,7 +359,9 @@ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
String originalRole,
String role,
AuthenticationDataSource authData) {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync(
tenantName, operation, role, authData);
Expand Down Expand Up @@ -396,7 +417,9 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
String originalRole,
String role,
AuthenticationDataSource authData) {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync(
namespaceName, operation, role, authData);
Expand Down Expand Up @@ -438,7 +461,9 @@ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceNa
String originalRole,
String role,
AuthenticationDataSource authData) {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, role, authData);
Expand Down Expand Up @@ -495,10 +520,8 @@ public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic
String originalRole,
String role,
AuthenticationDataSource authData) {
try {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
} catch (RestException e) {
return FutureUtil.failedFuture(e);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicPolicyOperationAsync(
Expand Down Expand Up @@ -582,7 +605,9 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
String originalRole,
String role,
AuthenticationDataSource authData) {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicOperationAsync(
topicName, operation, role, authData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3991,7 +3991,7 @@ protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean author
});
}

public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private int nonPersistentPendingMessages = 0;
private final int maxNonPersistentPendingMessages;
private String originalPrincipal = null;
private Set<String> proxyRoles;
private boolean authenticateOriginalAuthData;
private final boolean schemaValidationEnforced;
private String authMethod = "none";
Expand Down Expand Up @@ -261,7 +260,6 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.recentlyClosedProducers = new HashMap<>();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
this.authenticateOriginalAuthData = conf.isAuthenticateOriginalAuthData();
this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
this.maxMessageSize = conf.getMaxMessageSize();
Expand Down Expand Up @@ -367,32 +365,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.close();
}

/**
* When transitioning from Connecting to Connected, this method validates the roles.
* If the authRole is one of proxyRoles, the following must be true:
* - the originalPrincipal is given while connecting
* - originalPrincipal is not blank
* - originalPrincipal is not a proxy principal.
* @return true when roles are valid and false when roles are invalid
*/
private boolean isValidRoleAndOriginalPrincipal() {
String errorMsg = null;
if (proxyRoles.contains(authRole)) {
if (StringUtils.isBlank(originalPrincipal)) {
errorMsg = "originalPrincipal must be provided when connecting with a proxy role.";
} else if (proxyRoles.contains(originalPrincipal)) {
errorMsg = "originalPrincipal cannot be a proxy role.";
}
}
if (errorMsg != null) {
log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress, authRole,
originalPrincipal, errorMsg);
return false;
} else {
return true;
}
}

// ////
// // Incoming commands handling
// ////
Expand Down Expand Up @@ -685,7 +657,8 @@ private void doAuthentication(AuthData clientData,
if (state != State.Connected) {
// First time authentication is done
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
if (!isValidRoleAndOriginalPrincipal()) {
if (!service.getAuthorizationService()
.isValidOriginalPrincipal(this.authRole, originalPrincipal, remoteAddress)) {
state = State.Failed;
service.getPulsarStats().recordConnectionCreateFail();
final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,11 @@ public static boolean isClientAuthenticated(String appId) {
return appId != null;
}

private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
String originalPrincipal) {
if (proxyRoles.contains(authenticatedPrincipal)) {
// Request has come from a proxy
if (StringUtils.isBlank(originalPrincipal)) {
log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
throw new RestException(Status.UNAUTHORIZED,
"Original principal cannot be empty if the request is via proxy.");
}
if (proxyRoles.contains(originalPrincipal)) {
log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
}
private void validateOriginalPrincipal(String authenticatedPrincipal, String originalPrincipal) {
if (!pulsar.getBrokerService().getAuthorizationService()
.isValidOriginalPrincipal(authenticatedPrincipal, originalPrincipal, clientAuthData())) {
throw new RestException(Status.UNAUTHORIZED,
"Invalid combination of Original principal cannot be empty if the request is via proxy.");
}
}

Expand All @@ -184,7 +176,7 @@ public CompletableFuture<Void> validateSuperUserAccessAsync(){
isClientAuthenticated(appId), appId);
}
String originalPrincipal = originalPrincipal();
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), appId, originalPrincipal);
validateOriginalPrincipal(appId, originalPrincipal);

if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
BrokerService brokerService = pulsar.getBrokerService();
Expand Down Expand Up @@ -259,7 +251,7 @@ protected void validateAdminAccessForTenant(String tenant) {
}
}

protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
protected void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
String originalPrincipal, String tenant,
AuthenticationDataSource authenticationData,
long timeout, TimeUnit unit) {
Expand All @@ -286,7 +278,7 @@ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String tenan
clientAuthData());
}

protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
protected CompletableFuture<Void> validateAdminAccessForTenantAsync(
PulsarService pulsar, String clientAppId,
String originalPrincipal, String tenant,
AuthenticationDataSource authenticationData) {
Expand All @@ -305,8 +297,7 @@ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
if (!isClientAuthenticated(clientAppId)) {
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId,
originalPrincipal);
validateOriginalPrincipal(clientAppId, originalPrincipal);
if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.EnumSet;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.common.naming.TopicDomain;
Expand All @@ -32,11 +38,11 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.Sets;

@Test(groups = "flaky")
public class AuthorizationTest extends MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -229,6 +235,39 @@ public void simple() throws Exception {
admin.clusters().deleteCluster("c1");
}

@Test
public void testOriginalRoleValidation() throws Exception {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProxyRoles(Collections.singleton("proxy"));
AuthorizationService auth = new AuthorizationService(conf, Mockito.mock(PulsarResources.class));

// Original principal should be supplied when authenticatedPrincipal is proxy role
assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (SocketAddress) null));

// Non proxy role should not supply originalPrincipal
assertTrue(auth.isValidOriginalPrincipal("client", "", (SocketAddress) null));
assertTrue(auth.isValidOriginalPrincipal("client", null, (SocketAddress) null));

// Only likely in cases when authentication is disabled, but we still define these to be valid.
assertTrue(auth.isValidOriginalPrincipal(null, null, (SocketAddress) null));
assertTrue(auth.isValidOriginalPrincipal(null, "", (SocketAddress) null));
assertTrue(auth.isValidOriginalPrincipal("", null, (SocketAddress) null));
assertTrue(auth.isValidOriginalPrincipal("", "", (SocketAddress) null));

// Proxy role must supply an original principal
assertFalse(auth.isValidOriginalPrincipal("proxy", "", (SocketAddress) null));
assertFalse(auth.isValidOriginalPrincipal("proxy", null, (SocketAddress) null));

// OriginalPrincipal cannot be proxy role
assertFalse(auth.isValidOriginalPrincipal("proxy", "proxy", (SocketAddress) null));
assertFalse(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null));
assertFalse(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null));
assertFalse(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null));

// Must gracefully handle a missing AuthenticationDataSource
assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (AuthenticationDataSource) null));
}

@Test
public void testGetListWithGetBundleOp() throws Exception {
String tenant = "p1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ public void testConnectCommandWithInvalidRoleCombinations() throws Exception {
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "pass.proxy");
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "");
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", null);
// Invalid combinations where original principal is set to a pass.proxy role
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client", "pass.proxy");
// Invalid combinations where the original principal is set to a non-proxy role
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client1", "pass.client");
}

private void verifyAuthRoleAndOriginalPrincipalBehavior(String authMethodName, String authData,
Expand Down

0 comments on commit 7b2a14c

Please sign in to comment.