Skip to content

Commit

Permalink
Async authorization check while creation of producer/consumer (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Nov 9, 2016
1 parent 15683fd commit afaa63f
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 223 deletions.
Expand Up @@ -16,8 +16,8 @@
package com.yahoo.pulsar.broker.authorization;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,7 +26,6 @@
import com.yahoo.pulsar.broker.cache.ConfigurationCacheService;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.AuthAction;
import com.yahoo.pulsar.common.policies.data.Policies;

/**
*/
Expand All @@ -50,9 +49,19 @@ public AuthorizationManager(ServiceConfiguration conf, ConfigurationCacheService
* @param role
* the app id used to send messages to the destination.
*/
public boolean canProduce(DestinationName destination, String role) {
public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role) {
return checkAuthorization(destination, role, AuthAction.produce);
}

public boolean canProduce(DestinationName destination, String role) {
try {
return canProduceAsync(destination, role).get();
} catch (Exception e) {
log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
return false;
}
}

/**
* Check if the specified role has permission to receive messages from the specified fully qualified destination
Expand All @@ -63,9 +72,19 @@ public boolean canProduce(DestinationName destination, String role) {
* @param role
* the app id used to receive messages from the destination.
*/
public boolean canConsume(DestinationName destination, String role) {
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role) {
return checkAuthorization(destination, role, AuthAction.consume);
}

public boolean canConsume(DestinationName destination, String role) {
try {
return canConsumeAsync(destination, role).get();
} catch (Exception e) {
log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
return false;
}
}

/**
* Check whether the specified role can perform a lookup for the specified destination.
Expand All @@ -80,10 +99,14 @@ public boolean canLookup(DestinationName destination, String role) {
return canProduce(destination, role) || canConsume(destination, role);
}

private boolean checkAuthorization(DestinationName destination, String role, AuthAction action) {
if (isSuperUser(role))
return true;
return checkPermission(destination, role, action) && checkCluster(destination);
private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
AuthAction action) {
if (isSuperUser(role)) {
return CompletableFuture.completedFuture(true);
} else {
return checkPermission(destination, role, action)
.thenApply(isPermission -> isPermission && checkCluster(destination));
}
}

private boolean checkCluster(DestinationName destination) {
Expand All @@ -98,38 +121,49 @@ private boolean checkCluster(DestinationName destination) {
}
}

public boolean checkPermission(DestinationName destination, String role, AuthAction action) {
public CompletableFuture<Boolean> checkPermission(DestinationName destination, String role,
AuthAction action) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
Optional<Policies> policies = configCache.policiesCache().get(POLICY_ROOT + destination.getNamespace());
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for destination : {}", destination);
configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for destination : {}", destination);
}
permissionFuture.complete(false);
} else {
Set<AuthAction> namespaceActions = policies.get().auth_policies.namespace_auth.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
// The role has namespace level permission
permissionFuture.complete(true);
} else {
Map<String, Set<AuthAction>> roles = policies.get().auth_policies.destination_auth
.get(destination.toString());
if (roles == null) {
// Destination has no custom policy
permissionFuture.complete(false);
} else {
Set<AuthAction> resourceActions = roles.get(role);
if (resourceActions != null && resourceActions.contains(action)) {
// The role has destination level permission
permissionFuture.complete(true);
} else {
permissionFuture.complete(false);
}
}
}
}
return false;
}

Set<AuthAction> namespaceActions = policies.get().auth_policies.namespace_auth.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
// The role has namespace level permission
return true;
}

Map<String, Set<AuthAction>> roles = policies.get().auth_policies.destination_auth.get(destination.toString());
if (roles == null) {
// Destination has no custom policy
return false;
}

Set<AuthAction> resourceActions = roles.get(role);
if (resourceActions != null && resourceActions.contains(action)) {
// The role has destination level permission
return true;
}
return false;
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
ex);
permissionFuture.complete(false);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
return false;
permissionFuture.complete(false);
}
return permissionFuture;
}

/**
Expand Down

0 comments on commit afaa63f

Please sign in to comment.