Skip to content

Commit

Permalink
[improve][broker] Require authRole is proxyRole to set originalPrinci…
Browse files Browse the repository at this point in the history
…pal (apache#19455)

Co-authored-by: Lari Hotari <lhotari@apache.org>
  • Loading branch information
michaeljmarshall and lhotari committed Feb 14, 2023
1 parent c4c1744 commit aa63a55
Show file tree
Hide file tree
Showing 23 changed files with 161 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.pulsar.broker.authorization;

import static java.util.concurrent.TimeUnit.SECONDS;
import java.net.SocketAddress;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -37,7 +37,6 @@
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -293,19 +292,39 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return provider.allowSinkOpsAsync(namespaceName, role, authenticationData);
}

private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
String originalPrincipal) {
if (proxyRoles.contains(authenticatedPrincipal)) {
// Request has come from a proxy
public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
String originalPrincipal,
AuthenticationDataSource authDataSource) {
SocketAddress remoteAddress = authDataSource != null ? authDataSource.getPeerAddress() : null;
return isValidOriginalPrincipal(authenticatedPrincipal, originalPrincipal, remoteAddress);
}

/**
* Validates that the authenticatedPrincipal and the originalPrincipal are a valid combination.
* Valid combinations fulfill the following rule: the authenticatedPrincipal is in
* {@link ServiceConfiguration#getProxyRoles()}, if, and only if, the originalPrincipal is set to a role
* that is not also in {@link ServiceConfiguration#getProxyRoles()}.
* @return true when roles are a valid combination and false when roles are an invalid combination
*/
public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
String originalPrincipal,
SocketAddress remoteAddress) {
String errorMsg = null;
if (conf.getProxyRoles().contains(authenticatedPrincipal)) {
if (StringUtils.isBlank(originalPrincipal)) {
log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the "
+ "request is via proxy.");
}
if (proxyRoles.contains(originalPrincipal)) {
log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
errorMsg = "originalPrincipal must be provided when connecting with a proxy role.";
} else if (conf.getProxyRoles().contains(originalPrincipal)) {
errorMsg = "originalPrincipal cannot be a proxy role.";
}
} else if (StringUtils.isNotBlank(originalPrincipal)) {
errorMsg = "cannot specify originalPrincipal when connecting without valid proxy role.";
}
if (errorMsg != null) {
log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress,
authenticatedPrincipal, originalPrincipal, errorMsg);
return false;
} else {
return true;
}
}

Expand Down Expand Up @@ -340,7 +359,9 @@ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
String originalRole,
String role,
AuthenticationDataSource authData) {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync(
tenantName, operation, role, authData);
Expand Down Expand Up @@ -400,7 +421,9 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
String originalRole,
String role,
AuthenticationDataSource authData) {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync(
namespaceName, operation, role, authData);
Expand Down Expand Up @@ -442,7 +465,9 @@ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceNa
String originalRole,
String role,
AuthenticationDataSource authData) {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, role, authData);
Expand Down Expand Up @@ -503,10 +528,8 @@ public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic
String originalRole,
String role,
AuthenticationDataSource authData) {
try {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
} catch (RestException e) {
return FutureUtil.failedFuture(e);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicPolicyOperationAsync(
Expand Down Expand Up @@ -594,7 +617,9 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
String originalRole,
String role,
AuthenticationDataSource authData) {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicOperationAsync(
topicName, operation, role, authData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4317,7 +4317,7 @@ protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean author
});
}

public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private int nonPersistentPendingMessages = 0;
private final int maxNonPersistentPendingMessages;
private String originalPrincipal = null;
private Set<String> proxyRoles;
private final boolean schemaValidationEnforced;
private String authMethod = "none";
private final int maxMessageSize;
Expand Down Expand Up @@ -282,7 +281,6 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.recentlyClosedProducers = new HashMap<>();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
this.maxMessageSize = conf.getMaxMessageSize();
this.maxPendingSendRequests = conf.getMaxPendingPublishRequestsPerConnection();
Expand Down Expand Up @@ -400,32 +398,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.close();
}

/**
* When transitioning from Connecting to Connected, this method validates the roles.
* If the authRole is one of proxyRoles, the following must be true:
* - the originalPrincipal is given while connecting
* - originalPrincipal is not blank
* - originalPrincipal is not a proxy principal.
* @return true when roles are valid and false when roles are invalid
*/
private boolean isValidRoleAndOriginalPrincipal() {
String errorMsg = null;
if (proxyRoles.contains(authRole)) {
if (StringUtils.isBlank(originalPrincipal)) {
errorMsg = "originalPrincipal must be provided when connecting with a proxy role.";
} else if (proxyRoles.contains(originalPrincipal)) {
errorMsg = "originalPrincipal cannot be a proxy role.";
}
}
if (errorMsg != null) {
log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress, authRole,
originalPrincipal, errorMsg);
return false;
} else {
return true;
}
}

// ////
// // Incoming commands handling
// ////
Expand Down Expand Up @@ -694,7 +666,8 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion) {
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
if (!isValidRoleAndOriginalPrincipal()) {
if (!service.getAuthorizationService()
.isValidOriginalPrincipal(authRole, originalPrincipal, remoteAddress)) {
state = State.Failed;
service.getPulsarStats().recordConnectionCreateFail();
final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,11 @@ public static boolean isClientAuthenticated(String appId) {
return appId != null;
}

private static void validateOriginalPrincipal(Set<String> proxyRoles, String authenticatedPrincipal,
String originalPrincipal) {
if (proxyRoles.contains(authenticatedPrincipal)) {
// Request has come from a proxy
if (StringUtils.isBlank(originalPrincipal)) {
log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
throw new RestException(Status.UNAUTHORIZED,
"Original principal cannot be empty if the request is via proxy.");
}
if (proxyRoles.contains(originalPrincipal)) {
log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
throw new RestException(Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
}
private void validateOriginalPrincipal(String authenticatedPrincipal, String originalPrincipal) {
if (!pulsar.getBrokerService().getAuthorizationService()
.isValidOriginalPrincipal(authenticatedPrincipal, originalPrincipal, clientAuthData())) {
throw new RestException(Status.UNAUTHORIZED,
"Invalid combination of Original principal cannot be empty if the request is via proxy.");
}
}

Expand All @@ -185,7 +177,7 @@ public CompletableFuture<Void> validateSuperUserAccessAsync(){
isClientAuthenticated(appId), appId);
}
String originalPrincipal = originalPrincipal();
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), appId, originalPrincipal);
validateOriginalPrincipal(appId, originalPrincipal);

if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
BrokerService brokerService = pulsar.getBrokerService();
Expand Down Expand Up @@ -260,7 +252,7 @@ protected void validateAdminAccessForTenant(String tenant) {
}
}

protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
protected void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
String originalPrincipal, String tenant,
AuthenticationDataSource authenticationData,
long timeout, TimeUnit unit) {
Expand All @@ -287,7 +279,7 @@ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String tenan
clientAuthData());
}

protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
protected CompletableFuture<Void> validateAdminAccessForTenantAsync(
PulsarService pulsar, String clientAppId,
String originalPrincipal, String tenant,
AuthenticationDataSource authenticationData) {
Expand All @@ -306,8 +298,7 @@ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
if (!isClientAuthenticated(clientAppId)) {
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId,
originalPrincipal);
validateOriginalPrincipal(clientAppId, originalPrincipal);
if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.EnumSet;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.common.naming.TopicDomain;
Expand All @@ -33,6 +38,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -229,6 +235,39 @@ public void simple() throws Exception {
admin.clusters().deleteCluster("c1");
}

@Test
public void testOriginalRoleValidation() throws Exception {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProxyRoles(Collections.singleton("proxy"));
AuthorizationService auth = new AuthorizationService(conf, Mockito.mock(PulsarResources.class));

// Original principal should be supplied when authenticatedPrincipal is proxy role
assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (SocketAddress) null));

// Non proxy role should not supply originalPrincipal
assertTrue(auth.isValidOriginalPrincipal("client", "", (SocketAddress) null));
assertTrue(auth.isValidOriginalPrincipal("client", null, (SocketAddress) null));

// Only likely in cases when authentication is disabled, but we still define these to be valid.
assertTrue(auth.isValidOriginalPrincipal(null, null, (SocketAddress) null));
assertTrue(auth.isValidOriginalPrincipal(null, "", (SocketAddress) null));
assertTrue(auth.isValidOriginalPrincipal("", null, (SocketAddress) null));
assertTrue(auth.isValidOriginalPrincipal("", "", (SocketAddress) null));

// Proxy role must supply an original principal
assertFalse(auth.isValidOriginalPrincipal("proxy", "", (SocketAddress) null));
assertFalse(auth.isValidOriginalPrincipal("proxy", null, (SocketAddress) null));

// OriginalPrincipal cannot be proxy role
assertFalse(auth.isValidOriginalPrincipal("proxy", "proxy", (SocketAddress) null));
assertFalse(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null));
assertFalse(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null));
assertFalse(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null));

// Must gracefully handle a missing AuthenticationDataSource
assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (AuthenticationDataSource) null));
}

@Test
public void testGetListWithGetBundleOp() throws Exception {
String tenant = "p1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
public final static String CLIENT_KEYSTORE_PW = "111111";
public final static String CLIENT_TRUSTSTORE_PW = "111111";

public final static String PROXY_KEYSTORE_FILE_PATH =
ResourceUtils.getAbsolutePath("certificate-authority/jks/proxy.keystore.jks");
public final static String PROXY_KEYSTORE_PW = "111111";
public final static String PROXY_AND_CLIENT_TRUSTSTORE_FILE_PATH =
ResourceUtils.getAbsolutePath("certificate-authority/jks/proxy-and-client.truststore.jks");
public final static String PROXY_AND_CLIENT_TRUSTSTORE_PW = "111111";

public final static String CLIENT_KEYSTORE_CN = "clientuser";
public final static String KEYSTORE_TYPE = "JKS";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@ public void testConnectCommandWithInvalidRoleCombinations() throws Exception {
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "pass.proxy");
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "");
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", null);
// Invalid combinations where original principal is set to a pass.proxy role
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client", "pass.proxy");
// Invalid combinations where the original principal is set to a non-proxy role
verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client1", "pass.client");
}

private void verifyAuthRoleAndOriginalPrincipalBehavior(String authMethodName, String authData,
Expand Down
Loading

0 comments on commit aa63a55

Please sign in to comment.