Skip to content

Commit

Permalink
Refactored databus to enforce ownership of subscriptions.
Browse files Browse the repository at this point in the history
  • Loading branch information
billkalter committed Sep 22, 2016
1 parent 99b44c9 commit 45b1ae0
Show file tree
Hide file tree
Showing 44 changed files with 1,596 additions and 215 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.bazaarvoice.emodb.databus.api;

import com.bazaarvoice.emodb.common.api.UnauthorizedException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Thrown when an unauthorized databus operation is performed on a subscription, such when a subscription created with
* one API key is polled using a different key.
*/
@JsonIgnoreProperties({"cause", "localizedMessage", "stackTrace", "message", "suppressed"})
public class UnauthorizedSubscriptionException extends UnauthorizedException {
private final String _subscription;

public UnauthorizedSubscriptionException() {
_subscription = null;
}

public UnauthorizedSubscriptionException(String subscription) {
super(subscription);
_subscription = subscription;
}

@JsonCreator
public UnauthorizedSubscriptionException(@JsonProperty("reason") String message, @JsonProperty("subscription") String subscription) {
super(message);
_subscription = subscription;
}

public String getSubscription() {
return _subscription;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.bazaarvoice.emodb.databus.api.MoveSubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.ReplaySubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.Subscription;
import com.bazaarvoice.emodb.databus.api.UnauthorizedSubscriptionException;
import com.bazaarvoice.emodb.databus.api.UnknownMoveException;
import com.bazaarvoice.emodb.databus.api.UnknownReplayException;
import com.bazaarvoice.emodb.databus.api.UnknownSubscriptionException;
Expand Down Expand Up @@ -419,6 +420,13 @@ private RuntimeException convertException(EmoClientException e) {
} else if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() &&
UnknownReplayException.class.getName().equals(exceptionType)) {
return response.getEntity(UnknownReplayException.class);
} else if (response.getStatus() == Response.Status.FORBIDDEN.getStatusCode() &&
UnauthorizedSubscriptionException.class.getName().equals(exceptionType)) {
if (response.hasEntity()) {
return (RuntimeException) response.getEntity(UnauthorizedSubscriptionException.class).initCause(e);
} else {
return (RuntimeException) new UnauthorizedSubscriptionException().initCause(e);
}
} else if (response.getStatus() == Response.Status.FORBIDDEN.getStatusCode() &&
UnauthorizedException.class.getName().equals(exceptionType)) {
if (response.hasEntity()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import com.bazaarvoice.emodb.databus.core.CanaryManager;
import com.bazaarvoice.emodb.databus.core.DatabusChannelConfiguration;
import com.bazaarvoice.emodb.databus.core.DatabusEventStore;
import com.bazaarvoice.emodb.databus.core.DatabusFactory;
import com.bazaarvoice.emodb.databus.core.DedupMigrationTask;
import com.bazaarvoice.emodb.databus.core.DefaultDatabus;
import com.bazaarvoice.emodb.databus.core.DefaultFanoutManager;
import com.bazaarvoice.emodb.databus.core.DefaultRateLimitedLogFactory;
import com.bazaarvoice.emodb.databus.core.FanoutManager;
import com.bazaarvoice.emodb.databus.core.MasterFanout;
import com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus;
import com.bazaarvoice.emodb.databus.core.RateLimitedLogFactory;
import com.bazaarvoice.emodb.databus.core.SubscriptionEvaluator;
import com.bazaarvoice.emodb.databus.core.SystemQueueMonitorManager;
Expand Down Expand Up @@ -86,15 +88,17 @@
* <li> @{@link Global} {@link CuratorFramework}
* <li> Jersey {@link Client}
* <li> @{@link ReplicationKey} String
* <li> @{@link SystemInternalId} String
* <li> DataStore {@link DataProvider}
* <li> DataStore {@link EventBus}
* <li> DataStore {@link DataStoreConfiguration}
* <li> {@link com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer}
* <li> @{@link DefaultJoinFilter} Supplier&lt;{@link Condition}&gt;
* <li> {@link Clock}
* </ul>
* Exports the following:
* <ul>
* <li> {@link Databus}
* <li> {@link DatabusFactory}
* <li> {@link DatabusEventStore}
* <li> {@link ReplicationSource}
* </ul>
Expand Down Expand Up @@ -150,8 +154,9 @@ protected void configure() {
expose(DatabusEventStore.class);

// Bind the Databus instance that the rest of the application will consume
bind(Databus.class).to(DefaultDatabus.class).asEagerSingleton();
expose(Databus.class);
bind(OwnerAwareDatabus.class).to(DefaultDatabus.class).asEagerSingleton();
bind(DatabusFactory.class).asEagerSingleton();
expose(DatabusFactory.class);

// Bind the cross-data center outbound replication end point
bind(ReplicationSource.class).to(DefaultReplicationSource.class).asEagerSingleton();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.bazaarvoice.emodb.databus;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@BindingAnnotation
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
public @interface SystemInternalId {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.bazaarvoice.emodb.databus.auth;

import com.bazaarvoice.emodb.databus.model.OwnedSubscription;

/**
* Simple {@link DatabusAuthorizer} implementation that either permits or denies all requests based on the provided
* value.
*/
public class ConstantDatabusAuthorizer implements DatabusAuthorizer {

private final ConstantDatabusAuthorizerForOwner _authorizer;

public static final ConstantDatabusAuthorizer ALLOW_ALL = new ConstantDatabusAuthorizer(true);
public static final ConstantDatabusAuthorizer DENY_ALL = new ConstantDatabusAuthorizer(false);

private ConstantDatabusAuthorizer(boolean authorize) {
_authorizer = new ConstantDatabusAuthorizerForOwner(authorize);
}

@Override
public DatabusAuthorizerByOwner owner(String ownerId) {
return _authorizer;
}

private class ConstantDatabusAuthorizerForOwner implements DatabusAuthorizerByOwner {
private final boolean _authorize;

private ConstantDatabusAuthorizerForOwner(boolean authorize) {
_authorize = authorize;
}

@Override
public boolean canAccessSubscription(OwnedSubscription subscription) {
return _authorize;
}

@Override
public boolean canReceiveEventsFromTable(String table) {
return _authorize;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.bazaarvoice.emodb.databus.auth;

import com.bazaarvoice.emodb.databus.model.OwnedSubscription;

/**
* This interface defines the interactions for authorizing databus subscription and fanout operations.
* In all cases the ownerId is the internal ID for a user.
*/
public interface DatabusAuthorizer {

DatabusAuthorizerByOwner owner(String ownerId);

interface DatabusAuthorizerByOwner {
/**
* Checks whether an owner has permission to resubscribe to or poll the provided subscription. Typically used
* in response to API subscribe and poll requests, respectively.
*/
boolean canAccessSubscription(OwnedSubscription subscription);

/**
* Checks whether an owner has permission to receive databus events on a given table when polling. Typically
* used during fanout to ensure the owner doesn't receive updates for documents he wouldn't have permission to read
* directly using the DataStore.
*/
boolean canReceiveEventsFromTable(String table);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.bazaarvoice.emodb.databus.auth;

import com.google.common.base.Objects;
import com.google.common.collect.Maps;

import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Implementation of DatabusAuthorizer that overrides authorization for specific owners. All other owners
* are optionally proxied to another instance.
*/
public class FilteredDatabusAuthorizer implements DatabusAuthorizer {

private final Map<String, DatabusAuthorizer> _ownerOverrides;
private final DatabusAuthorizer _authorizer;

private FilteredDatabusAuthorizer(Map<String, DatabusAuthorizer> ownerOverrides,
DatabusAuthorizer authorizer) {
_ownerOverrides = checkNotNull(ownerOverrides, "ownerOverrides");
_authorizer = checkNotNull(authorizer, "authorizer");
}

@Override
public DatabusAuthorizerByOwner owner(String ownerId) {
// TODO: To grandfather in subscriptions before API keys were enforced the following code
// always defers to the default authorizer if there is no owner. This code should be
// replaced with the commented-out version once enough time has passed for all grandfathered-in
// subscriptions to have been renewed and therefore have an owner attached.
//
// return Objects.firstNonNull(_ownerOverrides.get(ownerId), _authorizer).owner(ownerId);

DatabusAuthorizer authorizer = null;
if (ownerId != null) {
authorizer = _ownerOverrides.get(ownerId);
}
if (authorizer == null) {
authorizer = _authorizer;
}
return authorizer.owner(ownerId);
}

public static Builder builder() {
return new Builder();
}

/**
* Builder class for creating a FilteredDatabusAuthorizer.
*/
public static class Builder {
private final Map<String, DatabusAuthorizer> _ownerOverrides = Maps.newHashMap();
private DatabusAuthorizer _defaultAuthorizer;

private Builder() {
// no-op
}

public Builder withAuthorizerForOwner(String ownerId, DatabusAuthorizer authorizer) {
checkArgument(!_ownerOverrides.containsKey(ownerId), "Cannot assign multiple rules for owner");
_ownerOverrides.put(ownerId, authorizer);
return this;
}

public Builder withDefaultAuthorizer(DatabusAuthorizer defaultAuthorizer) {
checkArgument(_defaultAuthorizer == null, "Cannot assign multiple default authorizers");
_defaultAuthorizer = defaultAuthorizer;
return this;
}

public FilteredDatabusAuthorizer build() {
if (_defaultAuthorizer == null) {
// Unless specified the default behavior is to deny all access to subscriptions and tables
// not explicitly permitted.
_defaultAuthorizer = ConstantDatabusAuthorizer.DENY_ALL;
}
return new FilteredDatabusAuthorizer(_ownerOverrides, _defaultAuthorizer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.bazaarvoice.emodb.databus.auth;

import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* {@link DatabusAuthorizer} for system processes, such as the canary and databus replay.
*/
public class SystemProcessDatabusAuthorizer implements DatabusAuthorizer {

private final Logger _log = LoggerFactory.getLogger(getClass());

private final String _systemOwnerId;

private final DatabusAuthorizerByOwner _processAuthorizer = new DatabusAuthorizerByOwner() {
@Override
public boolean canAccessSubscription(OwnedSubscription subscription) {
// System should only access its own subscriptions
return _systemOwnerId.equals(subscription.getOwnerId());
}

@Override
public boolean canReceiveEventsFromTable(String table) {
// System needs to be able to poll on updates to all tables
return true;
}
};

public SystemProcessDatabusAuthorizer(String systemOwnerId) {
_systemOwnerId = checkNotNull(systemOwnerId, "systemOwnerId");
}

@Override
public DatabusAuthorizerByOwner owner(String ownerId) {
if (_systemOwnerId.equals(ownerId)) {
return _processAuthorizer;
}
_log.warn("Non-system owner attempted authorization from system authorizer: {}", ownerId);
return ConstantDatabusAuthorizer.DENY_ALL.owner(ownerId);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.bazaarvoice.emodb.databus.core;

import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
import com.bazaarvoice.emodb.databus.SystemInternalId;
import com.bazaarvoice.emodb.databus.ChannelNames;
import com.bazaarvoice.emodb.databus.api.Databus;
import com.bazaarvoice.emodb.event.owner.OstrichOwnerFactory;
Expand Down Expand Up @@ -37,7 +38,8 @@ public class CanaryManager {
public CanaryManager(final LifeCycleRegistry lifeCycle,
@DatabusClusterInfo Collection<ClusterInfo> clusterInfo,
Placements placements,
final Databus databus,
final DatabusFactory databusFactory,
final @SystemInternalId String systemInternalId,
final RateLimitedLogFactory logFactory,
OstrichOwnerGroupFactory ownerGroupFactory,
final MetricRegistry metricRegistry) {
Expand Down Expand Up @@ -84,6 +86,7 @@ public PartitionContext getContext(String cluster) {
public Service create(String clusterName) {
ClusterInfo cluster = checkNotNull(clusterInfoMap.get(clusterName), clusterName);
Condition condition = checkNotNull(clusterToConditionMap.get(clusterName), clusterName);
Databus databus = databusFactory.forOwner(systemInternalId);
return new Canary(cluster, condition, databus, logFactory, metricRegistry);
}
}, null);
Expand Down
Loading

0 comments on commit 45b1ae0

Please sign in to comment.