diff --git a/databus-api/src/main/java/com/bazaarvoice/emodb/databus/api/UnauthorizedSubscriptionException.java b/databus-api/src/main/java/com/bazaarvoice/emodb/databus/api/UnauthorizedSubscriptionException.java
new file mode 100644
index 0000000000..522e2b0085
--- /dev/null
+++ b/databus-api/src/main/java/com/bazaarvoice/emodb/databus/api/UnauthorizedSubscriptionException.java
@@ -0,0 +1,34 @@
+package com.bazaarvoice.emodb.databus.api;
+
+import com.bazaarvoice.emodb.common.api.UnauthorizedException;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Thrown when an unauthorized databus operation is performed on a subscription, such when a subscription created with
+ * one API key is polled using a different key.
+ */
+@JsonIgnoreProperties({"cause", "localizedMessage", "stackTrace", "message", "suppressed"})
+public class UnauthorizedSubscriptionException extends UnauthorizedException {
+ private final String _subscription;
+
+ public UnauthorizedSubscriptionException() {
+ _subscription = null;
+ }
+
+ public UnauthorizedSubscriptionException(String subscription) {
+ super(subscription);
+ _subscription = subscription;
+ }
+
+ @JsonCreator
+ public UnauthorizedSubscriptionException(@JsonProperty("reason") String message, @JsonProperty("subscription") String subscription) {
+ super(message);
+ _subscription = subscription;
+ }
+
+ public String getSubscription() {
+ return _subscription;
+ }
+}
\ No newline at end of file
diff --git a/databus-client-common/src/main/java/com/bazaarvoice/emodb/databus/client/DatabusClient.java b/databus-client-common/src/main/java/com/bazaarvoice/emodb/databus/client/DatabusClient.java
index 013803b1be..2f55fe8655 100644
--- a/databus-client-common/src/main/java/com/bazaarvoice/emodb/databus/client/DatabusClient.java
+++ b/databus-client-common/src/main/java/com/bazaarvoice/emodb/databus/client/DatabusClient.java
@@ -13,6 +13,7 @@
import com.bazaarvoice.emodb.databus.api.MoveSubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.ReplaySubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.Subscription;
+import com.bazaarvoice.emodb.databus.api.UnauthorizedSubscriptionException;
import com.bazaarvoice.emodb.databus.api.UnknownMoveException;
import com.bazaarvoice.emodb.databus.api.UnknownReplayException;
import com.bazaarvoice.emodb.databus.api.UnknownSubscriptionException;
@@ -419,6 +420,13 @@ private RuntimeException convertException(EmoClientException e) {
} else if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() &&
UnknownReplayException.class.getName().equals(exceptionType)) {
return response.getEntity(UnknownReplayException.class);
+ } else if (response.getStatus() == Response.Status.FORBIDDEN.getStatusCode() &&
+ UnauthorizedSubscriptionException.class.getName().equals(exceptionType)) {
+ if (response.hasEntity()) {
+ return (RuntimeException) response.getEntity(UnauthorizedSubscriptionException.class).initCause(e);
+ } else {
+ return (RuntimeException) new UnauthorizedSubscriptionException().initCause(e);
+ }
} else if (response.getStatus() == Response.Status.FORBIDDEN.getStatusCode() &&
UnauthorizedException.class.getName().equals(exceptionType)) {
if (response.hasEntity()) {
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/DatabusModule.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/DatabusModule.java
index b3bfd65887..f16315ec82 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/DatabusModule.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/DatabusModule.java
@@ -19,12 +19,14 @@
import com.bazaarvoice.emodb.databus.core.CanaryManager;
import com.bazaarvoice.emodb.databus.core.DatabusChannelConfiguration;
import com.bazaarvoice.emodb.databus.core.DatabusEventStore;
+import com.bazaarvoice.emodb.databus.core.DatabusFactory;
import com.bazaarvoice.emodb.databus.core.DedupMigrationTask;
import com.bazaarvoice.emodb.databus.core.DefaultDatabus;
import com.bazaarvoice.emodb.databus.core.DefaultFanoutManager;
import com.bazaarvoice.emodb.databus.core.DefaultRateLimitedLogFactory;
import com.bazaarvoice.emodb.databus.core.FanoutManager;
import com.bazaarvoice.emodb.databus.core.MasterFanout;
+import com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus;
import com.bazaarvoice.emodb.databus.core.RateLimitedLogFactory;
import com.bazaarvoice.emodb.databus.core.SubscriptionEvaluator;
import com.bazaarvoice.emodb.databus.core.SystemQueueMonitorManager;
@@ -86,15 +88,17 @@
*
@{@link Global} {@link CuratorFramework}
* Jersey {@link Client}
* @{@link ReplicationKey} String
+ * @{@link SystemInternalId} String
* DataStore {@link DataProvider}
* DataStore {@link EventBus}
* DataStore {@link DataStoreConfiguration}
+ * {@link com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer}
* @{@link DefaultJoinFilter} Supplier<{@link Condition}>
* {@link Clock}
*
* Exports the following:
*
- * - {@link Databus}
+ *
- {@link DatabusFactory}
*
- {@link DatabusEventStore}
*
- {@link ReplicationSource}
*
@@ -150,8 +154,9 @@ protected void configure() {
expose(DatabusEventStore.class);
// Bind the Databus instance that the rest of the application will consume
- bind(Databus.class).to(DefaultDatabus.class).asEagerSingleton();
- expose(Databus.class);
+ bind(OwnerAwareDatabus.class).to(DefaultDatabus.class).asEagerSingleton();
+ bind(DatabusFactory.class).asEagerSingleton();
+ expose(DatabusFactory.class);
// Bind the cross-data center outbound replication end point
bind(ReplicationSource.class).to(DefaultReplicationSource.class).asEagerSingleton();
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/SystemInternalId.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/SystemInternalId.java
new file mode 100644
index 0000000000..7c980edb0c
--- /dev/null
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/SystemInternalId.java
@@ -0,0 +1,16 @@
+package com.bazaarvoice.emodb.databus;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface SystemInternalId {
+}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/ConstantDatabusAuthorizer.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/ConstantDatabusAuthorizer.java
new file mode 100644
index 0000000000..3b9b6c4a3a
--- /dev/null
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/ConstantDatabusAuthorizer.java
@@ -0,0 +1,42 @@
+package com.bazaarvoice.emodb.databus.auth;
+
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
+
+/**
+ * Simple {@link DatabusAuthorizer} implementation that either permits or denies all requests based on the provided
+ * value.
+ */
+public class ConstantDatabusAuthorizer implements DatabusAuthorizer {
+
+ private final ConstantDatabusAuthorizerForOwner _authorizer;
+
+ public static final ConstantDatabusAuthorizer ALLOW_ALL = new ConstantDatabusAuthorizer(true);
+ public static final ConstantDatabusAuthorizer DENY_ALL = new ConstantDatabusAuthorizer(false);
+
+ private ConstantDatabusAuthorizer(boolean authorize) {
+ _authorizer = new ConstantDatabusAuthorizerForOwner(authorize);
+ }
+
+ @Override
+ public DatabusAuthorizerByOwner owner(String ownerId) {
+ return _authorizer;
+ }
+
+ private class ConstantDatabusAuthorizerForOwner implements DatabusAuthorizerByOwner {
+ private final boolean _authorize;
+
+ private ConstantDatabusAuthorizerForOwner(boolean authorize) {
+ _authorize = authorize;
+ }
+
+ @Override
+ public boolean canAccessSubscription(OwnedSubscription subscription) {
+ return _authorize;
+ }
+
+ @Override
+ public boolean canReceiveEventsFromTable(String table) {
+ return _authorize;
+ }
+ }
+}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/DatabusAuthorizer.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/DatabusAuthorizer.java
new file mode 100644
index 0000000000..afc724dbd5
--- /dev/null
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/DatabusAuthorizer.java
@@ -0,0 +1,28 @@
+package com.bazaarvoice.emodb.databus.auth;
+
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
+
+/**
+ * This interface defines the interactions for authorizing databus subscription and fanout operations.
+ * In all cases the ownerId is the internal ID for a user.
+ */
+public interface DatabusAuthorizer {
+
+ DatabusAuthorizerByOwner owner(String ownerId);
+
+ interface DatabusAuthorizerByOwner {
+ /**
+ * Checks whether an owner has permission to resubscribe to or poll the provided subscription. Typically used
+ * in response to API subscribe and poll requests, respectively.
+ */
+ boolean canAccessSubscription(OwnedSubscription subscription);
+
+ /**
+ * Checks whether an owner has permission to receive databus events on a given table when polling. Typically
+ * used during fanout to ensure the owner doesn't receive updates for documents he wouldn't have permission to read
+ * directly using the DataStore.
+ */
+ boolean canReceiveEventsFromTable(String table);
+
+ }
+}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/FilteredDatabusAuthorizer.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/FilteredDatabusAuthorizer.java
new file mode 100644
index 0000000000..eb41ae0e7a
--- /dev/null
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/FilteredDatabusAuthorizer.java
@@ -0,0 +1,81 @@
+package com.bazaarvoice.emodb.databus.auth;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Implementation of DatabusAuthorizer that overrides authorization for specific owners. All other owners
+ * are optionally proxied to another instance.
+ */
+public class FilteredDatabusAuthorizer implements DatabusAuthorizer {
+
+ private final Map _ownerOverrides;
+ private final DatabusAuthorizer _authorizer;
+
+ private FilteredDatabusAuthorizer(Map ownerOverrides,
+ DatabusAuthorizer authorizer) {
+ _ownerOverrides = checkNotNull(ownerOverrides, "ownerOverrides");
+ _authorizer = checkNotNull(authorizer, "authorizer");
+ }
+
+ @Override
+ public DatabusAuthorizerByOwner owner(String ownerId) {
+ // TODO: To grandfather in subscriptions before API keys were enforced the following code
+ // always defers to the default authorizer if there is no owner. This code should be
+ // replaced with the commented-out version once enough time has passed for all grandfathered-in
+ // subscriptions to have been renewed and therefore have an owner attached.
+ //
+ // return Objects.firstNonNull(_ownerOverrides.get(ownerId), _authorizer).owner(ownerId);
+
+ DatabusAuthorizer authorizer = null;
+ if (ownerId != null) {
+ authorizer = _ownerOverrides.get(ownerId);
+ }
+ if (authorizer == null) {
+ authorizer = _authorizer;
+ }
+ return authorizer.owner(ownerId);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder class for creating a FilteredDatabusAuthorizer.
+ */
+ public static class Builder {
+ private final Map _ownerOverrides = Maps.newHashMap();
+ private DatabusAuthorizer _defaultAuthorizer;
+
+ private Builder() {
+ // no-op
+ }
+
+ public Builder withAuthorizerForOwner(String ownerId, DatabusAuthorizer authorizer) {
+ checkArgument(!_ownerOverrides.containsKey(ownerId), "Cannot assign multiple rules for owner");
+ _ownerOverrides.put(ownerId, authorizer);
+ return this;
+ }
+
+ public Builder withDefaultAuthorizer(DatabusAuthorizer defaultAuthorizer) {
+ checkArgument(_defaultAuthorizer == null, "Cannot assign multiple default authorizers");
+ _defaultAuthorizer = defaultAuthorizer;
+ return this;
+ }
+
+ public FilteredDatabusAuthorizer build() {
+ if (_defaultAuthorizer == null) {
+ // Unless specified the default behavior is to deny all access to subscriptions and tables
+ // not explicitly permitted.
+ _defaultAuthorizer = ConstantDatabusAuthorizer.DENY_ALL;
+ }
+ return new FilteredDatabusAuthorizer(_ownerOverrides, _defaultAuthorizer);
+ }
+ }
+}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/SystemProcessDatabusAuthorizer.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/SystemProcessDatabusAuthorizer.java
new file mode 100644
index 0000000000..b4308b7bf4
--- /dev/null
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/auth/SystemProcessDatabusAuthorizer.java
@@ -0,0 +1,44 @@
+package com.bazaarvoice.emodb.databus.auth;
+
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link DatabusAuthorizer} for system processes, such as the canary and databus replay.
+ */
+public class SystemProcessDatabusAuthorizer implements DatabusAuthorizer {
+
+ private final Logger _log = LoggerFactory.getLogger(getClass());
+
+ private final String _systemOwnerId;
+
+ private final DatabusAuthorizerByOwner _processAuthorizer = new DatabusAuthorizerByOwner() {
+ @Override
+ public boolean canAccessSubscription(OwnedSubscription subscription) {
+ // System should only access its own subscriptions
+ return _systemOwnerId.equals(subscription.getOwnerId());
+ }
+
+ @Override
+ public boolean canReceiveEventsFromTable(String table) {
+ // System needs to be able to poll on updates to all tables
+ return true;
+ }
+ };
+
+ public SystemProcessDatabusAuthorizer(String systemOwnerId) {
+ _systemOwnerId = checkNotNull(systemOwnerId, "systemOwnerId");
+ }
+
+ @Override
+ public DatabusAuthorizerByOwner owner(String ownerId) {
+ if (_systemOwnerId.equals(ownerId)) {
+ return _processAuthorizer;
+ }
+ _log.warn("Non-system owner attempted authorization from system authorizer: {}", ownerId);
+ return ConstantDatabusAuthorizer.DENY_ALL.owner(ownerId);
+ }
+}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/CanaryManager.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/CanaryManager.java
index 6551a9a0c4..55c32de671 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/CanaryManager.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/CanaryManager.java
@@ -1,6 +1,7 @@
package com.bazaarvoice.emodb.databus.core;
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
+import com.bazaarvoice.emodb.databus.SystemInternalId;
import com.bazaarvoice.emodb.databus.ChannelNames;
import com.bazaarvoice.emodb.databus.api.Databus;
import com.bazaarvoice.emodb.event.owner.OstrichOwnerFactory;
@@ -37,7 +38,8 @@ public class CanaryManager {
public CanaryManager(final LifeCycleRegistry lifeCycle,
@DatabusClusterInfo Collection clusterInfo,
Placements placements,
- final Databus databus,
+ final DatabusFactory databusFactory,
+ final @SystemInternalId String systemInternalId,
final RateLimitedLogFactory logFactory,
OstrichOwnerGroupFactory ownerGroupFactory,
final MetricRegistry metricRegistry) {
@@ -84,6 +86,7 @@ public PartitionContext getContext(String cluster) {
public Service create(String clusterName) {
ClusterInfo cluster = checkNotNull(clusterInfoMap.get(clusterName), clusterName);
Condition condition = checkNotNull(clusterToConditionMap.get(clusterName), clusterName);
+ Databus databus = databusFactory.forOwner(systemInternalId);
return new Canary(cluster, condition, databus, logFactory, metricRegistry);
}
}, null);
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DatabusFactory.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DatabusFactory.java
new file mode 100644
index 0000000000..b04bf378a7
--- /dev/null
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DatabusFactory.java
@@ -0,0 +1,147 @@
+package com.bazaarvoice.emodb.databus.core;
+
+import com.bazaarvoice.emodb.databus.api.Databus;
+import com.bazaarvoice.emodb.databus.api.Event;
+import com.bazaarvoice.emodb.databus.api.MoveSubscriptionStatus;
+import com.bazaarvoice.emodb.databus.api.ReplaySubscriptionStatus;
+import com.bazaarvoice.emodb.databus.api.Subscription;
+import com.bazaarvoice.emodb.databus.api.UnknownSubscriptionException;
+import com.bazaarvoice.emodb.sor.condition.Condition;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * When used internally within the EmoDB server databus operations require the context of the owner which is making each
+ * databus request as provided by {@link OwnerAwareDatabus}. However, it is inconvenient to use nominally
+ * different interfaces throughout the system and to require passing around an owner ID throughout the stack in order
+ * to use OwnerAwareDatabus.
+ *
+ * The purpose of DatabusFactory is to provide a proxy for providing {@link Databus} interface access to the
+ * OwnerAwareDatabus based on the owner ID passed to {@link #forOwner(String)}.
+ */
+public class DatabusFactory {
+
+ private final OwnerAwareDatabus _ownerAwareDatabus;
+
+ @Inject
+ public DatabusFactory(OwnerAwareDatabus ownerAwareDatabus) {
+ _ownerAwareDatabus = ownerAwareDatabus;
+ }
+
+ public Databus forOwner(final String ownerId) {
+ checkNotNull(ownerId, "ownerId");
+
+ /**
+ * Proxy class for Databus that simply inserts the owner ID where appropriate.
+ */
+ return new Databus() {
+ @Override
+ public Iterator listSubscriptions(@Nullable String fromSubscriptionExclusive, long limit) {
+ return _ownerAwareDatabus.listSubscriptions(ownerId, fromSubscriptionExclusive, limit);
+ }
+
+ @Override
+ public void subscribe(String subscription, Condition tableFilter, Duration subscriptionTtl, Duration eventTtl) {
+ _ownerAwareDatabus.subscribe(ownerId, subscription, tableFilter, subscriptionTtl, eventTtl);
+ }
+
+ @Override
+ public void subscribe(String subscription, Condition tableFilter, Duration subscriptionTtl, Duration eventTtl, boolean ignoreSuppressedEvents) {
+ _ownerAwareDatabus.subscribe(ownerId, subscription, tableFilter, subscriptionTtl, eventTtl, ignoreSuppressedEvents);
+ }
+
+ @Override
+ public void unsubscribe(String subscription) {
+ _ownerAwareDatabus.unsubscribe(ownerId, subscription);
+ }
+
+ @Override
+ public Subscription getSubscription(String subscription) throws UnknownSubscriptionException {
+ return _ownerAwareDatabus.getSubscription(ownerId, subscription);
+ }
+
+ @Override
+ public long getEventCount(String subscription) {
+ return _ownerAwareDatabus.getEventCount(ownerId, subscription);
+ }
+
+ @Override
+ public long getEventCountUpTo(String subscription, long limit) {
+ return _ownerAwareDatabus.getEventCountUpTo(ownerId, subscription, limit);
+ }
+
+ @Override
+ public long getClaimCount(String subscription) {
+ return _ownerAwareDatabus.getClaimCount(ownerId, subscription);
+ }
+
+ @Override
+ public List peek(String subscription, int limit) {
+ return _ownerAwareDatabus.peek(ownerId, subscription, limit);
+ }
+
+ @Override
+ public List poll(String subscription, Duration claimTtl, int limit) {
+ return _ownerAwareDatabus.poll(ownerId, subscription, claimTtl, limit);
+ }
+
+ @Override
+ public void renew(String subscription, Collection eventKeys, Duration claimTtl) {
+ _ownerAwareDatabus.renew(ownerId, subscription, eventKeys, claimTtl);
+ }
+
+ @Override
+ public void acknowledge(String subscription, Collection eventKeys) {
+ _ownerAwareDatabus.acknowledge(ownerId, subscription, eventKeys);
+ }
+
+ @Override
+ public String replayAsync(String subscription) {
+ return _ownerAwareDatabus.replayAsync(ownerId, subscription);
+ }
+
+ @Override
+ public String replayAsyncSince(String subscription, Date since) {
+ return _ownerAwareDatabus.replayAsyncSince(ownerId, subscription, since);
+ }
+
+ @Override
+ public ReplaySubscriptionStatus getReplayStatus(String reference) {
+ return _ownerAwareDatabus.getReplayStatus(ownerId, reference);
+ }
+
+ @Override
+ public String moveAsync(String from, String to) {
+ return _ownerAwareDatabus.moveAsync(ownerId, from, to);
+ }
+
+ @Override
+ public MoveSubscriptionStatus getMoveStatus(String reference) {
+ return _ownerAwareDatabus.getMoveStatus(ownerId, reference);
+ }
+
+ @Override
+ public void injectEvent(String subscription, String table, String key) {
+ _ownerAwareDatabus.injectEvent(ownerId, subscription, table, key);
+ }
+
+ @Override
+ public void unclaimAll(String subscription) {
+ _ownerAwareDatabus.unclaimAll(ownerId, subscription);
+ }
+
+ @Override
+ public void purge(String subscription) {
+ _ownerAwareDatabus.purge(ownerId, subscription);
+ }
+ };
+ }
+}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultDatabus.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultDatabus.java
index 8603dc16af..f0c5fccb82 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultDatabus.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultDatabus.java
@@ -5,16 +5,19 @@
import com.bazaarvoice.emodb.common.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.databus.ChannelNames;
import com.bazaarvoice.emodb.databus.DefaultJoinFilter;
-import com.bazaarvoice.emodb.databus.api.Databus;
+import com.bazaarvoice.emodb.databus.SystemInternalId;
import com.bazaarvoice.emodb.databus.api.Event;
import com.bazaarvoice.emodb.databus.api.MoveSubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.Names;
import com.bazaarvoice.emodb.databus.api.ReplaySubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.Subscription;
+import com.bazaarvoice.emodb.databus.api.UnauthorizedSubscriptionException;
import com.bazaarvoice.emodb.databus.api.UnknownMoveException;
import com.bazaarvoice.emodb.databus.api.UnknownReplayException;
import com.bazaarvoice.emodb.databus.api.UnknownSubscriptionException;
+import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
import com.bazaarvoice.emodb.event.api.EventData;
import com.bazaarvoice.emodb.event.api.EventSink;
import com.bazaarvoice.emodb.event.core.SizeCacheKey;
@@ -75,7 +78,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-public class DefaultDatabus implements Databus, Managed {
+public class DefaultDatabus implements OwnerAwareDatabus, Managed {
/** How long should poll loop, searching for events before giving up and returning. */
private static final Duration MAX_POLL_TIME = Duration.millis(100);
@@ -94,6 +97,8 @@ public class DefaultDatabus implements Databus, Managed {
private final DataProvider _dataProvider;
private final SubscriptionEvaluator _subscriptionEvaluator;
private final JobService _jobService;
+ private final DatabusAuthorizer _databusAuthorizer;
+ private final String _systemOwnerId;
private final Meter _peekedMeter;
private final Meter _polledMeter;
private final Meter _renewedMeter;
@@ -103,6 +108,7 @@ public class DefaultDatabus implements Databus, Managed {
private final Meter _redundantMeter;
private final Meter _discardedMeter;
private final Meter _consolidatedMeter;
+ private final Meter _unownedSubscriptionMeter;
private final LoadingCache> _eventSizeCache;
private final Supplier _defaultJoinFilterCondition;
private final Ticker _ticker;
@@ -112,14 +118,18 @@ public class DefaultDatabus implements Databus, Managed {
public DefaultDatabus(LifeCycleRegistry lifeCycle, EventBus eventBus, DataProvider dataProvider,
SubscriptionDAO subscriptionDao, DatabusEventStore eventStore,
SubscriptionEvaluator subscriptionEvaluator, JobService jobService,
- JobHandlerRegistry jobHandlerRegistry, MetricRegistry metricRegistry,
- @DefaultJoinFilter Supplier defaultJoinFilterCondition, Clock clock) {
+ JobHandlerRegistry jobHandlerRegistry, DatabusAuthorizer databusAuthorizer,
+ @SystemInternalId String systemOwnerId,
+ @DefaultJoinFilter Supplier defaultJoinFilterCondition,
+ MetricRegistry metricRegistry, Clock clock) {
_eventBus = eventBus;
_subscriptionDao = subscriptionDao;
_eventStore = eventStore;
_dataProvider = dataProvider;
_subscriptionEvaluator = subscriptionEvaluator;
_jobService = jobService;
+ _databusAuthorizer = databusAuthorizer;
+ _systemOwnerId = systemOwnerId;
_defaultJoinFilterCondition = defaultJoinFilterCondition;
_ticker = ClockTicker.getTicker(clock);
_clock = clock;
@@ -132,6 +142,7 @@ public DefaultDatabus(LifeCycleRegistry lifeCycle, EventBus eventBus, DataProvid
_redundantMeter = newEventMeter("redundant", metricRegistry);
_discardedMeter = newEventMeter("discarded", metricRegistry);
_consolidatedMeter = newEventMeter("consolidated", metricRegistry);
+ _unownedSubscriptionMeter = newEventMeter("unowned", metricRegistry);
_eventSizeCache = CacheBuilder.newBuilder()
.expireAfterWrite(15, TimeUnit.SECONDS)
.maximumSize(2000)
@@ -161,6 +172,10 @@ public JobHandler get() {
public MoveSubscriptionResult run(MoveSubscriptionRequest request)
throws Exception {
try {
+ // Last chance to verify the subscriptions' owner before doing anything mutative
+ checkSubscriptionOwner(request.getOwnerId(), request.getFrom());
+ checkSubscriptionOwner(request.getOwnerId(), request.getTo());
+
_eventStore.move(request.getFrom(), request.getTo());
} catch (ReadOnlyQueueException e) {
// The from queue is not owned by this server.
@@ -184,6 +199,9 @@ public JobHandler get() {
public ReplaySubscriptionResult run(ReplaySubscriptionRequest request)
throws Exception {
try {
+ // Last chance to verify the subscription's owner before doing anything mutative
+ checkSubscriptionOwner(request.getOwnerId(), request.getSubscription());
+
replay(request.getSubscription(), request.getSince());
} catch (ReadOnlyQueueException e) {
// The subscription is not owned by this server.
@@ -207,7 +225,7 @@ public ReplaySubscriptionResult run(ReplaySubscriptionRequest request)
private void createDatabusReplaySubscription() {
// Create a master databus replay subscription where the events expire every 50 hours (2 days + 2 hours)
- subscribe(ChannelNames.getMasterReplayChannel(), Conditions.alwaysTrue(),
+ subscribe(_systemOwnerId, ChannelNames.getMasterReplayChannel(), Conditions.alwaysTrue(),
Duration.standardDays(3650), DatabusChannelConfiguration.REPLAY_TTL, false);
}
@@ -232,23 +250,23 @@ public void stop() throws Exception {
}
@Override
- public Iterator listSubscriptions(@Nullable String fromSubscriptionExclusive, long limit) {
+ public Iterator listSubscriptions(final String ownerId, @Nullable String fromSubscriptionExclusive, long limit) {
checkArgument(limit > 0, "Limit must be >0");
// We always have all the subscriptions cached in memory so fetch them all.
- Collection subscriptions = _subscriptionDao.getAllSubscriptions();
+ Collection subscriptions = _subscriptionDao.getAllSubscriptions();
- // Ignore internal subscriptions (eg. "__system_bus:canary").
- subscriptions = Collections2.filter(subscriptions, new Predicate() {
+ // Ignore subscriptions not accessible by the owner.
+ subscriptions = Collections2.filter(subscriptions, new Predicate() {
@Override
- public boolean apply(Subscription subscription) {
- return !subscription.getName().startsWith("__");
+ public boolean apply(OwnedSubscription subscription) {
+ return _databusAuthorizer.owner(ownerId).canAccessSubscription(subscription);
}
});
// Sort them by name. They're stored sorted in Cassandra so this should be a no-op, but
// do the sort anyway so we're not depending on internals of the subscription DAO.
- List sorted = new Ordering() {
+ List extends Subscription> sorted = new Ordering() {
@Override
public int compare(Subscription left, Subscription right) {
return left.getName().compareTo(right.getName());
@@ -271,23 +289,26 @@ public int compare(Subscription left, Subscription right) {
sorted = sorted.subList(0, (int) limit);
}
- return sorted.iterator();
+ //noinspection unchecked
+ return (Iterator) sorted.iterator();
}
@Override
- public void subscribe(String subscription, Condition tableFilter, Duration subscriptionTtl, Duration eventTtl) {
- subscribe(subscription, tableFilter, subscriptionTtl, eventTtl, true);
+ public void subscribe(String ownerId, String subscription, Condition tableFilter, Duration subscriptionTtl, Duration eventTtl) {
+ subscribe(ownerId, subscription, tableFilter, subscriptionTtl, eventTtl, true);
}
@Override
- public void subscribe(String subscription, Condition tableFilter, Duration subscriptionTtl, Duration eventTtl,
- boolean includeDefaultJoinFilter) {
- // This call should be depracated soon.
+ public void subscribe(String ownerId, String subscription, Condition tableFilter, Duration subscriptionTtl,
+ Duration eventTtl, boolean includeDefaultJoinFilter) {
+ // This call should be deprecated soon.
checkLegalSubscriptionName(subscription);
+ checkSubscriptionOwner(ownerId, subscription);
checkNotNull(tableFilter, "tableFilter");
checkArgument(subscriptionTtl.isLongerThan(Duration.ZERO), "SubscriptionTtl must be >0");
checkArgument(eventTtl.isLongerThan(Duration.ZERO), "EventTtl must be >0");
TableFilterValidator.checkAllowed(tableFilter);
+
if (includeDefaultJoinFilter) {
// If the default join filter condition is set (that is, isn't "alwaysTrue()") then add it to the filter
Condition defaultJoinFilterCondition = _defaultJoinFilterCondition.get();
@@ -303,22 +324,30 @@ public void subscribe(String subscription, Condition tableFilter, Duration subsc
// except for resetting the ttl, recreating a subscription that already exists has no effect.
// assume that multiple servers that manage the same subscriptions can each attempt to create
// the subscription at startup.
- _subscriptionDao.insertSubscription(subscription, tableFilter, subscriptionTtl, eventTtl);
+ _subscriptionDao.insertSubscription(ownerId, subscription, tableFilter, subscriptionTtl, eventTtl);
}
@Override
- public void unsubscribe(String subscription) {
+ public void unsubscribe(String ownerId, String subscription) {
checkLegalSubscriptionName(subscription);
+ checkSubscriptionOwner(ownerId, subscription);
_subscriptionDao.deleteSubscription(subscription);
_eventStore.purge(subscription);
}
@Override
- public Subscription getSubscription(String name) throws UnknownSubscriptionException {
+ public Subscription getSubscription(String ownerId, String name) throws UnknownSubscriptionException {
checkLegalSubscriptionName(name);
- Subscription subscription = _subscriptionDao.getSubscription(name);
+ OwnedSubscription subscription = getSubscriptionByName(name);
+ checkSubscriptionOwner(ownerId, subscription);
+
+ return subscription;
+ }
+
+ private OwnedSubscription getSubscriptionByName(String name) {
+ OwnedSubscription subscription = _subscriptionDao.getSubscription(name);
if (subscription == null) {
throw new UnknownSubscriptionException(name);
}
@@ -335,12 +364,14 @@ public void onUpdateIntent(UpdateIntentEvent event) {
}
@Override
- public long getEventCount(String subscription) {
- return getEventCountUpTo(subscription, Long.MAX_VALUE);
+ public long getEventCount(String ownerId, String subscription) {
+ return getEventCountUpTo(ownerId, subscription, Long.MAX_VALUE);
}
@Override
- public long getEventCountUpTo(String subscription, long limit) {
+ public long getEventCountUpTo(String ownerId, String subscription, long limit) {
+ checkSubscriptionOwner(ownerId, subscription);
+
// We get the size from cache as a tuple of size, and the limit used to estimate that size
// So, the key is the size, and value is the limit used to estimate the size
SizeCacheKey sizeCacheKey = new SizeCacheKey(subscription, limit);
@@ -361,16 +392,18 @@ private long internalEventCountUpTo(String subscription, long limit) {
}
@Override
- public long getClaimCount(String subscription) {
+ public long getClaimCount(String ownerId, String subscription) {
checkLegalSubscriptionName(subscription);
+ checkSubscriptionOwner(ownerId, subscription);
return _eventStore.getClaimCount(subscription);
}
@Override
- public List peek(final String subscription, int limit) {
+ public List peek(String ownerId, final String subscription, int limit) {
checkLegalSubscriptionName(subscription);
checkArgument(limit > 0, "Limit must be >0");
+ checkSubscriptionOwner(ownerId, subscription);
List events = peekOrPoll(subscription, null, limit);
_peekedMeter.mark(events.size());
@@ -378,10 +411,11 @@ public List peek(final String subscription, int limit) {
}
@Override
- public List poll(final String subscription, final Duration claimTtl, int limit) {
+ public List poll(String ownerId, final String subscription, final Duration claimTtl, int limit) {
checkLegalSubscriptionName(subscription);
checkArgument(claimTtl.getMillis() >= 0, "ClaimTtl must be >=0");
checkArgument(limit > 0, "Limit must be >0");
+ checkSubscriptionOwner(ownerId, subscription);
List events = peekOrPoll(subscription, claimTtl, limit);
_polledMeter.mark(events.size());
@@ -541,36 +575,39 @@ private boolean isRecent(UUID changeId) {
}
@Override
- public void renew(String subscription, Collection eventKeys, Duration claimTtl) {
+ public void renew(String ownerId, String subscription, Collection eventKeys, Duration claimTtl) {
checkLegalSubscriptionName(subscription);
checkNotNull(eventKeys, "eventKeys");
checkArgument(claimTtl.getMillis() >= 0, "ClaimTtl must be >=0");
+ checkSubscriptionOwner(ownerId, subscription);
_eventStore.renew(subscription, EventKeyFormat.decodeAll(eventKeys), claimTtl, true);
_renewedMeter.mark(eventKeys.size());
}
@Override
- public void acknowledge(String subscription, Collection eventKeys) {
+ public void acknowledge(String ownerId, String subscription, Collection eventKeys) {
checkLegalSubscriptionName(subscription);
checkNotNull(eventKeys, "eventKeys");
+ checkSubscriptionOwner(ownerId, subscription);
_eventStore.delete(subscription, EventKeyFormat.decodeAll(eventKeys), true);
_ackedMeter.mark(eventKeys.size());
}
@Override
- public String replayAsync(String subscription) {
- return replayAsyncSince(subscription, null);
+ public String replayAsync(String ownerId, String subscription) {
+ return replayAsyncSince(ownerId, subscription, null);
}
@Override
- public String replayAsyncSince(String subscription, Date since) {
+ public String replayAsyncSince(String ownerId, String subscription, Date since) {
checkLegalSubscriptionName(subscription);
+ checkSubscriptionOwner(ownerId, subscription);
JobIdentifier jobId =
_jobService.submitJob(
- new JobRequest<>(ReplaySubscriptionJob.INSTANCE, new ReplaySubscriptionRequest(subscription, since)));
+ new JobRequest<>(ReplaySubscriptionJob.INSTANCE, new ReplaySubscriptionRequest(ownerId, subscription, since)));
return jobId.toString();
}
@@ -580,18 +617,25 @@ public void replay(String subscription, Date since) {
checkState(since == null || new DateTime(since).plus(DatabusChannelConfiguration.REPLAY_TTL).isAfterNow(),
"Since timestamp is outside the replay TTL.");
String source = ChannelNames.getMasterReplayChannel();
- final Subscription destination = getSubscription(subscription);
+ final OwnedSubscription destination = getSubscriptionByName(subscription);
+ final DatabusAuthorizer.DatabusAuthorizerByOwner authorizer = _databusAuthorizer.owner(destination.getOwnerId());
_eventStore.copy(source, subscription, new Predicate() {
@Override
- public boolean apply(ByteBuffer eventData) {
- return _subscriptionEvaluator.matches(destination, eventData);
+ public boolean apply(ByteBuffer eventDataBytes) {
+ try {
+ SubscriptionEvaluator.MatchEventData eventData = _subscriptionEvaluator.getMatchEventData(eventDataBytes);
+ return _subscriptionEvaluator.matches(destination, eventData)
+ && authorizer.canReceiveEventsFromTable(eventData.getTable().getName());
+ } catch (UnknownTableException e) {
+ return false;
+ }
}
}, since);
}
@Override
- public ReplaySubscriptionStatus getReplayStatus(String reference) {
+ public ReplaySubscriptionStatus getReplayStatus(String ownerId, String reference) {
checkNotNull(reference, "reference");
JobIdentifier jobId;
@@ -613,6 +657,8 @@ public ReplaySubscriptionStatus getReplayStatus(String reference) {
throw new IllegalStateException("Replay request details not found: " + jobId);
}
+ checkSubscriptionOwner(ownerId, request.getSubscription());
+
switch (status.getStatus()) {
case FINISHED:
return new ReplaySubscriptionStatus(request.getSubscription(), ReplaySubscriptionStatus.Status.COMPLETE);
@@ -626,18 +672,21 @@ public ReplaySubscriptionStatus getReplayStatus(String reference) {
}
@Override
- public String moveAsync(String from, String to) {
+ public String moveAsync(String ownerId, String from, String to) {
checkLegalSubscriptionName(from);
checkLegalSubscriptionName(to);
+ checkSubscriptionOwner(ownerId, from);
+ checkSubscriptionOwner(ownerId, to);
JobIdentifier jobId =
- _jobService.submitJob(new JobRequest<>(MoveSubscriptionJob.INSTANCE, new MoveSubscriptionRequest(from, to)));
+ _jobService.submitJob(new JobRequest<>(
+ MoveSubscriptionJob.INSTANCE, new MoveSubscriptionRequest(ownerId, from, to)));
return jobId.toString();
}
@Override
- public MoveSubscriptionStatus getMoveStatus(String reference) {
+ public MoveSubscriptionStatus getMoveStatus(String ownerId, String reference) {
checkNotNull(reference, "reference");
JobIdentifier jobId;
@@ -659,6 +708,8 @@ public MoveSubscriptionStatus getMoveStatus(String reference) {
throw new IllegalStateException("Move request details not found: " + jobId);
}
+ checkSubscriptionOwner(ownerId, request.getFrom());
+
switch (status.getStatus()) {
case FINISHED:
return new MoveSubscriptionStatus(request.getFrom(), request.getTo(), MoveSubscriptionStatus.Status.COMPLETE);
@@ -672,23 +723,26 @@ public MoveSubscriptionStatus getMoveStatus(String reference) {
}
@Override
- public void injectEvent(String subscription, String table, String key) {
+ public void injectEvent(String ownerId, String subscription, String table, String key) {
// Pick a changeId UUID that's guaranteed to be older than the compaction cutoff so poll()'s calls to
// AnnotatedContent.isChangeDeltaPending() and isChangeDeltaRedundant() will always return false.
+ checkSubscriptionOwner(ownerId, subscription);
UpdateRef ref = new UpdateRef(table, key, TimeUUIDs.minimumUuid(), ImmutableSet.of());
_eventStore.add(subscription, UpdateRefSerializer.toByteBuffer(ref));
}
@Override
- public void unclaimAll(String subscription) {
+ public void unclaimAll(String ownerId, String subscription) {
checkLegalSubscriptionName(subscription);
+ checkSubscriptionOwner(ownerId, subscription);
_eventStore.unclaimAll(subscription);
}
@Override
- public void purge(String subscription) {
+ public void purge(String ownerId, String subscription) {
checkLegalSubscriptionName(subscription);
+ checkSubscriptionOwner(ownerId, subscription);
_eventStore.purge(subscription);
}
@@ -700,6 +754,25 @@ private void checkLegalSubscriptionName(String subscription) {
"An example of a valid subscription name would be 'polloi:review'.");
}
+ private void checkSubscriptionOwner(String ownerId, String subscription) {
+ // Verify the subscription either doesn't exist or is already owned by the same owner. In practice this is
+ // predominantly cached by SubscriptionDAO so performance should be good.
+ checkSubscriptionOwner(ownerId, _subscriptionDao.getSubscription(subscription));
+ }
+
+ private void checkSubscriptionOwner(String ownerId, OwnedSubscription subscription) {
+ checkNotNull(ownerId, "ownerId");
+ if (subscription != null) {
+ // Grandfather-in subscriptions created before ownership was introduced. This should be a temporary issue
+ // since the subscriptions will need to renew at some point or expire.
+ if (subscription.getOwnerId() == null) {
+ _unownedSubscriptionMeter.mark();
+ } else if (!_databusAuthorizer.owner(ownerId).canAccessSubscription(subscription)) {
+ throw new UnauthorizedSubscriptionException("Not subscriber", subscription.getName());
+ }
+ }
+ }
+
/** EventStore sink that doesn't count adjacent events for the same table/key against the peek/poll limit. */
private class ConsolidatingEventSink implements EventSink {
private final Map _eventMap = Maps.newLinkedHashMap();
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultFanout.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultFanout.java
index 5f5ee33963..c5a9809ba2 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultFanout.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultFanout.java
@@ -2,12 +2,14 @@
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.ServiceFailureListener;
import com.bazaarvoice.emodb.databus.ChannelNames;
-import com.bazaarvoice.emodb.databus.api.Subscription;
+import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
import com.bazaarvoice.emodb.datacenter.api.DataCenter;
import com.bazaarvoice.emodb.event.api.EventData;
import com.bazaarvoice.emodb.sor.api.UnknownTableException;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ArrayListMultimap;
@@ -44,10 +46,11 @@ public class DefaultFanout extends AbstractScheduledService {
private final Function, Void> _eventSink;
private final boolean _replicateOutbound;
private final Duration _sleepWhenIdle;
- private final Supplier> _subscriptionsSupplier;
+ private final Supplier> _subscriptionsSupplier;
private final DataCenter _currentDataCenter;
private final RateLimitedLog _rateLimitedLog;
private final SubscriptionEvaluator _subscriptionEvaluator;
+ private final DatabusAuthorizer _databusAuthorizer;
private final Meter _eventsRead;
private final Meter _eventsWrittenLocal;
private final Meter _eventsWrittenOutboundReplication;
@@ -57,10 +60,11 @@ public DefaultFanout(String name,
Function, Void> eventSink,
boolean replicateOutbound,
Duration sleepWhenIdle,
- Supplier> subscriptionsSupplier,
+ Supplier> subscriptionsSupplier,
DataCenter currentDataCenter,
RateLimitedLogFactory logFactory,
SubscriptionEvaluator subscriptionEvaluator,
+ DatabusAuthorizer databusAuthorizer,
MetricRegistry metricRegistry) {
_name = checkNotNull(name, "name");
_eventSource = checkNotNull(eventSource, "eventSource");
@@ -70,6 +74,7 @@ public DefaultFanout(String name,
_subscriptionsSupplier = checkNotNull(subscriptionsSupplier, "subscriptionsSupplier");
_currentDataCenter = checkNotNull(currentDataCenter, "currentDataCenter");
_subscriptionEvaluator = checkNotNull(subscriptionEvaluator, "subscriptionEvaluator");
+ _databusAuthorizer = checkNotNull(databusAuthorizer, "databusAuthorizer");
_rateLimitedLog = logFactory.from(_log);
_eventsRead = newEventMeter("read", metricRegistry);
@@ -114,13 +119,14 @@ private boolean copyEvents() {
}
// Last chance to check that we are the leader before doing anything that would be bad if we aren't.
- if (!isRunning()) {
- return false;
- }
+ return isRunning() && copyEvents(rawEvents);
+ }
- // Read the list of subscriptions *after* reading events from the event store to avoid race conditions with
+ @VisibleForTesting
+ boolean copyEvents(List rawEvents) {
+ // Read the list of subscriptions *after* reading events from the event store to avoid race conditions with
// creating a new subscription.
- Collection subscriptions = _subscriptionsSupplier.get();
+ Collection subscriptions = _subscriptionsSupplier.get();
// Copy the events to all the destination channels.
List eventKeys = Lists.newArrayListWithCapacity(rawEvents.size());
@@ -139,8 +145,10 @@ private boolean copyEvents() {
}
// Copy to subscriptions in the current data center.
- for (Subscription subscription : _subscriptionEvaluator.matches(subscriptions, matchEventData)) {
- eventsByChannel.put(subscription.getName(), eventData);
+ for (OwnedSubscription subscription : _subscriptionEvaluator.matches(subscriptions, matchEventData)) {
+ if (_databusAuthorizer.owner(subscription.getOwnerId()).canReceiveEventsFromTable(matchEventData.getTable().getName())) {
+ eventsByChannel.put(subscription.getName(), eventData);
+ }
}
// Copy to queues for eventual delivery to remote data centers.
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultFanoutManager.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultFanoutManager.java
index c54b9a6ffc..4177b47f68 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultFanoutManager.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/DefaultFanoutManager.java
@@ -6,8 +6,9 @@
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.ServiceFailureListener;
import com.bazaarvoice.emodb.databus.ChannelNames;
import com.bazaarvoice.emodb.databus.DatabusZooKeeper;
-import com.bazaarvoice.emodb.databus.api.Subscription;
+import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
import com.bazaarvoice.emodb.databus.repl.ReplicationEventSource;
import com.bazaarvoice.emodb.databus.repl.ReplicationSource;
import com.bazaarvoice.emodb.datacenter.api.DataCenter;
@@ -43,16 +44,19 @@ public class DefaultFanoutManager implements FanoutManager {
private final LeaderServiceTask _dropwizardTask;
private final RateLimitedLogFactory _logFactory;
private final SubscriptionEvaluator _subscriptionEvaluator;
+ private final DatabusAuthorizer _databusAuthorizer;
private final MetricRegistry _metricRegistry;
@Inject
public DefaultFanoutManager(final EventStore eventStore, final SubscriptionDAO subscriptionDao,
- SubscriptionEvaluator subscriptionEvaluator, DataCenters dataCenters,
+ SubscriptionEvaluator subscriptionEvaluator,
+ DatabusAuthorizer databusAuthorizer, DataCenters dataCenters,
@DatabusZooKeeper CuratorFramework curator, @SelfHostAndPort HostAndPort self,
LeaderServiceTask dropwizardTask, RateLimitedLogFactory logFactory, MetricRegistry metricRegistry) {
_eventStore = checkNotNull(eventStore, "eventStore");
_subscriptionDao = checkNotNull(subscriptionDao, "subscriptionDao");
- _subscriptionEvaluator = subscriptionEvaluator;
+ _subscriptionEvaluator = checkNotNull(subscriptionEvaluator, "subscriptionEvaluator");
+ _databusAuthorizer = checkNotNull(databusAuthorizer, "databusAuthorizer");
_dataCenters = checkNotNull(dataCenters, "dataCenters");
_curator = checkNotNull(curator, "curator");
_selfId = checkNotNull(self, "self").toString();
@@ -83,9 +87,9 @@ public Void apply(@Nullable Multimap eventsByChannel) {
return null;
}
};
- final Supplier> subscriptionsSupplier = new Supplier>() {
+ final Supplier> subscriptionsSupplier = new Supplier>() {
@Override
- public Collection get() {
+ public Collection get() {
return _subscriptionDao.getAllSubscriptions();
}
};
@@ -96,7 +100,8 @@ public Collection get() {
@Override
public Service get() {
return new DefaultFanout(name, eventSource, eventSink, replicateOutbound, sleepWhenIdle,
- subscriptionsSupplier, _dataCenters.getSelf(), _logFactory, _subscriptionEvaluator, _metricRegistry);
+ subscriptionsSupplier, _dataCenters.getSelf(), _logFactory, _subscriptionEvaluator,
+ _databusAuthorizer, _metricRegistry);
}
});
ServiceFailureListener.listenTo(leaderService, _metricRegistry);
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/MoveSubscriptionRequest.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/MoveSubscriptionRequest.java
index 86200d5870..daf3e4985c 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/MoveSubscriptionRequest.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/MoveSubscriptionRequest.java
@@ -7,15 +7,23 @@
public class MoveSubscriptionRequest {
+ private final String _ownerId;
private final String _from;
private final String _to;
@JsonCreator
- public MoveSubscriptionRequest(@JsonProperty ("from") String from, @JsonProperty ("to") String to) {
+ public MoveSubscriptionRequest(@JsonProperty ("ownerId") String ownerId,
+ @JsonProperty ("from") String from,
+ @JsonProperty ("to") String to) {
+ _ownerId = checkNotNull(ownerId, "ownerId");
_from = checkNotNull(from, "from");
_to = checkNotNull(to, "to");
}
+ public String getOwnerId() {
+ return _ownerId;
+ }
+
public String getFrom() {
return _from;
}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/OwnerAwareDatabus.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/OwnerAwareDatabus.java
new file mode 100644
index 0000000000..43e0a15aa6
--- /dev/null
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/OwnerAwareDatabus.java
@@ -0,0 +1,85 @@
+package com.bazaarvoice.emodb.databus.core;
+
+import com.bazaarvoice.emodb.databus.api.Event;
+import com.bazaarvoice.emodb.databus.api.MoveSubscriptionStatus;
+import com.bazaarvoice.emodb.databus.api.ReplaySubscriptionStatus;
+import com.bazaarvoice.emodb.databus.api.Subscription;
+import com.bazaarvoice.emodb.databus.api.UnauthorizedSubscriptionException;
+import com.bazaarvoice.emodb.databus.api.UnknownSubscriptionException;
+import com.bazaarvoice.emodb.sor.condition.Condition;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Parallel interface for {@link com.bazaarvoice.emodb.databus.api.Databus} that includes the owner's internal ID
+ * with each request. This class is intended for internal use only and should not be exposed outside the databus
+ * module. External systems that require a databus connection should get one using
+ * {@link DatabusFactory#forOwner(String)}.
+ */
+public interface OwnerAwareDatabus {
+
+ Iterator listSubscriptions(String ownerId, @Nullable String fromSubscriptionExclusive, long limit);
+
+ void subscribe(String ownerId, String subscription, Condition tableFilter, Duration subscriptionTtl, Duration eventTtl)
+ throws UnauthorizedSubscriptionException;
+
+ @Deprecated
+ void subscribe(String ownerId, String subscription, Condition tableFilter, Duration subscriptionTtl, Duration eventTtl, boolean ignoreSuppressedEvents)
+ throws UnauthorizedSubscriptionException;
+
+ void unsubscribe(String ownerId, String subscription)
+ throws UnauthorizedSubscriptionException;
+
+ Subscription getSubscription(String ownerId, String subscription)
+ throws UnknownSubscriptionException, UnauthorizedSubscriptionException;
+
+ long getEventCount(String ownerId, String subscription)
+ throws UnauthorizedSubscriptionException;
+
+ long getEventCountUpTo(String ownerId, String subscription, long limit)
+ throws UnauthorizedSubscriptionException;
+
+ long getClaimCount(String ownerId, String subscription)
+ throws UnauthorizedSubscriptionException;
+
+ List peek(String ownerId, String subscription, int limit)
+ throws UnauthorizedSubscriptionException;
+
+ List poll(String ownerId, String subscription, Duration claimTtl, int limit)
+ throws UnauthorizedSubscriptionException;
+
+ void renew(String ownerId, String subscription, Collection eventKeys, Duration claimTtl)
+ throws UnauthorizedSubscriptionException;
+
+ void acknowledge(String ownerId, String subscription, Collection eventKeys)
+ throws UnauthorizedSubscriptionException;
+
+ String replayAsync(String ownerId, String subscription)
+ throws UnauthorizedSubscriptionException;
+
+ String replayAsyncSince(String ownerId, String subscription, Date since)
+ throws UnauthorizedSubscriptionException;
+
+ ReplaySubscriptionStatus getReplayStatus(String ownerId, String reference)
+ throws UnauthorizedSubscriptionException;
+
+ String moveAsync(String ownerId, String from, String to)
+ throws UnauthorizedSubscriptionException;
+
+ MoveSubscriptionStatus getMoveStatus(String ownerId, String reference)
+ throws UnauthorizedSubscriptionException;
+
+ void injectEvent(String ownerId, String subscription, String table, String key)
+ throws UnauthorizedSubscriptionException;
+
+ void unclaimAll(String ownerId, String subscription)
+ throws UnauthorizedSubscriptionException;
+
+ void purge(String ownerId, String subscription)
+ throws UnauthorizedSubscriptionException;
+}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/ReplaySubscriptionRequest.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/ReplaySubscriptionRequest.java
index eff1c2a69f..f5ddf9f8b4 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/ReplaySubscriptionRequest.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/ReplaySubscriptionRequest.java
@@ -14,17 +14,23 @@
@JsonIgnoreProperties (ignoreUnknown = true)
public class ReplaySubscriptionRequest {
+ private String _ownerId;
private String _subscription;
@Nullable
private Date _since;
@JsonCreator
- public ReplaySubscriptionRequest(@JsonProperty ("subscription") String subscription,
+ public ReplaySubscriptionRequest(@JsonProperty ("ownerId") String ownerId,
+ @JsonProperty ("subscription") String subscription,
@JsonProperty ("since") @Nullable Date since) {
+ _ownerId = checkNotNull(ownerId, "ownerId");
_subscription = checkNotNull(subscription, "subscription");
_since = since;
}
+ public String getOwnerId() {
+ return _ownerId;
+ }
public String getSubscription() {
return _subscription;
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/SubscriptionEvaluator.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/SubscriptionEvaluator.java
index a28c25416d..4d4f2369bd 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/core/SubscriptionEvaluator.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/core/SubscriptionEvaluator.java
@@ -6,7 +6,8 @@
import com.bazaarvoice.emodb.sor.core.DataProvider;
import com.bazaarvoice.emodb.sor.core.UpdateRef;
import com.bazaarvoice.emodb.table.db.Table;
-import com.google.common.collect.Lists;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.slf4j.Logger;
@@ -32,14 +33,14 @@ public SubscriptionEvaluator(DataProvider dataProvider,
_rateLimitedLog = logFactory.from(_log);
}
- public Collection matches(Collection subscriptions, MatchEventData eventData) {
- Collection filteredSubscriptions = Lists.newArrayList();
- for (Subscription subscription : subscriptions) {
- if (matches(subscription, eventData)) {
- filteredSubscriptions.add(subscription);
- }
- }
- return filteredSubscriptions;
+ public Iterable matches(Iterable subscriptions, final MatchEventData eventData) {
+ return FluentIterable.from(subscriptions)
+ .filter(new Predicate() {
+ @Override
+ public boolean apply(Subscription subscription) {
+ return matches(subscription, eventData);
+ }
+ });
}
public boolean matches(Subscription subscription, ByteBuffer eventData) {
@@ -53,7 +54,7 @@ public boolean matches(Subscription subscription, ByteBuffer eventData) {
return matches(subscription, matchEventData);
}
- private boolean matches(Subscription subscription, MatchEventData eventData) {
+ public boolean matches(Subscription subscription, MatchEventData eventData) {
Table table = eventData.getTable();
try {
Map json;
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/db/SubscriptionDAO.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/db/SubscriptionDAO.java
index 21180a720d..ccfc21bc50 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/db/SubscriptionDAO.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/db/SubscriptionDAO.java
@@ -1,6 +1,6 @@
package com.bazaarvoice.emodb.databus.db;
-import com.bazaarvoice.emodb.databus.api.Subscription;
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
import com.bazaarvoice.emodb.sor.condition.Condition;
import org.joda.time.Duration;
@@ -9,12 +9,13 @@
public interface SubscriptionDAO {
- void insertSubscription(String subscription, Condition tableFilter, Duration subscriptionTtl, Duration eventTtl);
+ void insertSubscription(String ownerId, String subscription, Condition tableFilter, Duration subscriptionTtl,
+ Duration eventTtl);
void deleteSubscription(String subscription);
@Nullable
- Subscription getSubscription(String subscription);
+ OwnedSubscription getSubscription(String subscription);
- Collection getAllSubscriptions();
+ Collection getAllSubscriptions();
}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/db/astyanax/AstyanaxSubscriptionDAO.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/db/astyanax/AstyanaxSubscriptionDAO.java
index 49277d149b..065c9176d1 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/db/astyanax/AstyanaxSubscriptionDAO.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/db/astyanax/AstyanaxSubscriptionDAO.java
@@ -3,9 +3,9 @@
import com.bazaarvoice.emodb.common.api.Ttls;
import com.bazaarvoice.emodb.common.cassandra.CassandraKeyspace;
import com.bazaarvoice.emodb.common.json.JsonHelper;
-import com.bazaarvoice.emodb.databus.api.DefaultSubscription;
-import com.bazaarvoice.emodb.databus.api.Subscription;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
+import com.bazaarvoice.emodb.databus.model.DefaultOwnedSubscription;
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
import com.bazaarvoice.emodb.sor.condition.Condition;
import com.bazaarvoice.emodb.sor.condition.Conditions;
import com.codahale.metrics.annotation.Timed;
@@ -49,12 +49,14 @@ public AstyanaxSubscriptionDAO(CassandraKeyspace keyspace) {
@Timed(name = "bv.emodb.databus.AstyanaxSubscriptionDAO.insertSubscription", absolute = true)
@Override
- public void insertSubscription(String subscription, Condition tableFilter,
+ public void insertSubscription(String ownerId, String subscription, Condition tableFilter,
Duration subscriptionTtl, Duration eventTtl) {
- Map json = ImmutableMap.of(
- "filter", tableFilter.toString(),
- "expiresAt", System.currentTimeMillis() + subscriptionTtl.getMillis(),
- "eventTtl", Ttls.toSeconds(eventTtl, 1, Integer.MAX_VALUE));
+ Map json = ImmutableMap.builder()
+ .put("filter", tableFilter.toString())
+ .put("expiresAt", System.currentTimeMillis() + subscriptionTtl.getMillis())
+ .put("eventTtl", Ttls.toSeconds(eventTtl, 1, Integer.MAX_VALUE))
+ .put("ownerId", ownerId)
+ .build();
execute(_keyspace.prepareColumnMutation(CF_SUBSCRIPTION, ROW_KEY, subscription, CL_LOCAL_QUORUM)
.putValue(JsonHelper.asJson(json), Ttls.toSeconds(subscriptionTtl, 1, Integer.MAX_VALUE)));
}
@@ -67,23 +69,25 @@ public void deleteSubscription(String subscription) {
}
@Override
- public Subscription getSubscription(String subscription) {
+ public OwnedSubscription getSubscription(String subscription) {
throw new UnsupportedOperationException(); // CachingSubscriptionDAO should prevent calls to this method.
}
@Timed(name = "bv.emodb.databus.AstyanaxSubscriptionDAO.getAllSubscriptions", absolute = true)
@Override
- public Collection getAllSubscriptions() {
+ public Collection getAllSubscriptions() {
ColumnList columns = execute(_keyspace.prepareQuery(CF_SUBSCRIPTION, CL_LOCAL_QUORUM)
.getKey(ROW_KEY));
- List subscriptions = Lists.newArrayListWithCapacity(columns.size());
+ List subscriptions = Lists.newArrayListWithCapacity(columns.size());
for (Column column : columns) {
String name = column.getName();
Map, ?> json = JsonHelper.fromJson(column.getStringValue(), Map.class);
Condition tableFilter = Conditions.fromString((String) checkNotNull(json.get("filter"), "filter"));
Date expiresAt = new Date(((Number) checkNotNull(json.get("expiresAt"), "expiresAt")).longValue());
Duration eventTtl = Duration.standardSeconds(((Number) checkNotNull(json.get("eventTtl"), "eventTtl")).intValue());
- subscriptions.add(new DefaultSubscription(name, tableFilter, expiresAt, eventTtl));
+ // TODO: Once API keys are fully integrated enforce non-null
+ String ownerId = (String) json.get("ownerId");
+ subscriptions.add(new DefaultOwnedSubscription(name, tableFilter, expiresAt, eventTtl, ownerId));
}
return subscriptions;
}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/db/generic/CachingSubscriptionDAO.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/db/generic/CachingSubscriptionDAO.java
index a05caec211..558718a962 100644
--- a/databus/src/main/java/com/bazaarvoice/emodb/databus/db/generic/CachingSubscriptionDAO.java
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/db/generic/CachingSubscriptionDAO.java
@@ -5,6 +5,7 @@
import com.bazaarvoice.emodb.cachemgr.api.InvalidationScope;
import com.bazaarvoice.emodb.databus.api.Subscription;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
import com.bazaarvoice.emodb.sor.condition.Condition;
import com.google.common.base.Function;
import com.google.common.cache.CacheBuilder;
@@ -30,7 +31,7 @@ public class CachingSubscriptionDAO implements SubscriptionDAO {
private static final String SUBSCRIPTIONS = "subscriptions";
private final SubscriptionDAO _delegate;
- private final LoadingCache> _cache;
+ private final LoadingCache> _cache;
private final CacheHandle _cacheHandle;
@Inject
@@ -42,16 +43,16 @@ public CachingSubscriptionDAO(@CachingSubscriptionDAODelegate SubscriptionDAO de
_cache = CacheBuilder.newBuilder().
expireAfterAccess(10, TimeUnit.MINUTES).
recordStats().
- build(new CacheLoader>() {
+ build(new CacheLoader>() {
@Override
- public Map load(String ignored) throws Exception {
+ public Map load(String ignored) throws Exception {
return indexByName(_delegate.getAllSubscriptions());
}
});
_cacheHandle = cacheRegistry.register("subscriptions", _cache, true);
}
- private Map indexByName(Collection subscriptions) {
+ private Map indexByName(Collection subscriptions) {
return Maps.uniqueIndex(subscriptions, new Function() {
@Override
public String apply(Subscription subscription) {
@@ -61,8 +62,9 @@ public String apply(Subscription subscription) {
}
@Override
- public void insertSubscription(String subscription, Condition tableFilter, Duration subscriptionTtl, Duration eventTtl) {
- _delegate.insertSubscription(subscription, tableFilter, subscriptionTtl, eventTtl);
+ public void insertSubscription(String ownerId, String subscription, Condition tableFilter, Duration subscriptionTtl,
+ Duration eventTtl) {
+ _delegate.insertSubscription(ownerId, subscription, tableFilter, subscriptionTtl, eventTtl);
// Synchronously tell every other server in the cluster to forget what it has cached about subscriptions.
_cacheHandle.invalidate(InvalidationScope.DATA_CENTER, SUBSCRIPTIONS);
@@ -77,12 +79,11 @@ public void deleteSubscription(String subscription) {
}
@Override
- public Subscription getSubscription(String subscription) {
+ public OwnedSubscription getSubscription(String subscription) {
return _cache.getUnchecked(SUBSCRIPTIONS).get(subscription);
}
-
@Override
- public Collection getAllSubscriptions() {
+ public Collection getAllSubscriptions() {
return _cache.getUnchecked(SUBSCRIPTIONS).values();
}
}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/model/DefaultOwnedSubscription.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/model/DefaultOwnedSubscription.java
new file mode 100644
index 0000000000..b6b53ee9fc
--- /dev/null
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/model/DefaultOwnedSubscription.java
@@ -0,0 +1,59 @@
+package com.bazaarvoice.emodb.databus.model;
+
+import com.bazaarvoice.emodb.databus.api.DefaultSubscription;
+import com.bazaarvoice.emodb.databus.api.Subscription;
+import com.bazaarvoice.emodb.sor.condition.Condition;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.joda.time.Duration;
+
+import java.util.Date;
+
+/**
+ * Default implementation of {@link OwnedSubscription}. The JSON serialized version of this class does not include
+ * any ownership information so it is safe to return to client-facing interfaces where a {@link Subscription}
+ * is expected.
+ */
+public class DefaultOwnedSubscription implements OwnedSubscription {
+ private final Subscription _subscription;
+ private final String _ownerID;
+
+ public DefaultOwnedSubscription(String name, Condition tableFilter,
+ Date expiresAt, Duration eventTtl,
+ String ownerID) {
+ _subscription = new DefaultSubscription(name, tableFilter, expiresAt, eventTtl);
+ _ownerID = ownerID;
+ }
+
+ @Override
+ public String getOwnerId() {
+ return _ownerID;
+ }
+
+ @Override
+ public String getName() {
+ return _subscription.getName();
+ }
+
+ @Override
+ public Condition getTableFilter() {
+ return _subscription.getTableFilter();
+ }
+
+ @Override
+ public Date getExpiresAt() {
+ return _subscription.getExpiresAt();
+ }
+
+ @Override
+ public Duration getEventTtl() {
+ return _subscription.getEventTtl();
+ }
+
+ /**
+ * The JSON representation should not include any ownership attributes.
+ */
+ @JsonValue
+ public Subscription getSubscription() {
+ return _subscription;
+ }
+}
diff --git a/databus/src/main/java/com/bazaarvoice/emodb/databus/model/OwnedSubscription.java b/databus/src/main/java/com/bazaarvoice/emodb/databus/model/OwnedSubscription.java
new file mode 100644
index 0000000000..0ba074e906
--- /dev/null
+++ b/databus/src/main/java/com/bazaarvoice/emodb/databus/model/OwnedSubscription.java
@@ -0,0 +1,12 @@
+package com.bazaarvoice.emodb.databus.model;
+
+import com.bazaarvoice.emodb.databus.api.Subscription;
+
+/**
+ * Extension of {@link Subscription} that includes ownership information not part of the public API that are required
+ * for proper maintenance and evaluation.
+ */
+public interface OwnedSubscription extends Subscription {
+
+ String getOwnerId();
+}
diff --git a/databus/src/test/java/com/bazaarvoice/emodb/databus/DatabusModuleTest.java b/databus/src/test/java/com/bazaarvoice/emodb/databus/DatabusModuleTest.java
index 00b6875e60..2cdf98085b 100644
--- a/databus/src/test/java/com/bazaarvoice/emodb/databus/DatabusModuleTest.java
+++ b/databus/src/test/java/com/bazaarvoice/emodb/databus/DatabusModuleTest.java
@@ -11,7 +11,8 @@
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.SimpleLifeCycleRegistry;
import com.bazaarvoice.emodb.common.dropwizard.service.EmoServiceMode;
import com.bazaarvoice.emodb.common.dropwizard.task.TaskRegistry;
-import com.bazaarvoice.emodb.databus.api.Databus;
+import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
+import com.bazaarvoice.emodb.databus.core.DatabusFactory;
import com.bazaarvoice.emodb.datacenter.api.DataCenters;
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobService;
@@ -54,14 +55,14 @@ public class DatabusModuleTest {
public void testWebServer() {
Injector injector = createInjector(EmoServiceMode.STANDARD_ALL);
- assertNotNull(injector.getInstance(Databus.class));
+ assertNotNull(injector.getInstance(DatabusFactory.class));
}
@Test
public void testCliTool() {
Injector injector = createInjector(EmoServiceMode.CLI_TOOL);
- assertNotNull(injector.getInstance(Databus.class));
+ assertNotNull(injector.getInstance(DatabusFactory.class));
}
private Injector createInjector(final EmoServiceMode serviceMode) {
@@ -103,12 +104,14 @@ protected void configure() {
bind(CuratorFramework.class).annotatedWith(DatabusZooKeeper.class).toInstance(curator);
bind(HostDiscovery.class).annotatedWith(DatabusHostDiscovery.class).toInstance(mock(HostDiscovery.class));
bind(String.class).annotatedWith(ReplicationKey.class).toInstance("password");
+ bind(String.class).annotatedWith(SystemInternalId.class).toInstance("system");
bind(new TypeLiteral>(){}).annotatedWith(DatabusClusterInfo.class)
.toInstance(ImmutableList.of(new ClusterInfo("Test Cluster", "Test Metric Cluster")));
bind(JobService.class).toInstance(mock(JobService.class));
bind(JobHandlerRegistry.class).toInstance(mock(JobHandlerRegistry.class));
bind(new TypeLiteral>(){}).annotatedWith(DefaultJoinFilter.class)
.toInstance(Suppliers.ofInstance(Conditions.alwaysFalse()));
+ bind(DatabusAuthorizer.class).toInstance(mock(DatabusAuthorizer.class));
MetricRegistry metricRegistry = new MetricRegistry();
bind(MetricRegistry.class).toInstance(metricRegistry);
diff --git a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/ConsolidationTest.java b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/ConsolidationTest.java
index 0927664c46..3c8981c57b 100644
--- a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/ConsolidationTest.java
+++ b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/ConsolidationTest.java
@@ -2,8 +2,9 @@
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
import com.bazaarvoice.emodb.common.uuid.TimeUUIDs;
-import com.bazaarvoice.emodb.databus.api.Databus;
import com.bazaarvoice.emodb.databus.api.Event;
+import com.bazaarvoice.emodb.databus.auth.ConstantDatabusAuthorizer;
+import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
import com.bazaarvoice.emodb.event.api.EventData;
import com.bazaarvoice.emodb.event.api.EventSink;
@@ -58,9 +59,9 @@ public boolean poll(String subscription, Duration claimTtl, EventSink sink) {
}
};
Map content = entity("table", "key", ImmutableMap.of("rating", "5"));
- Databus databus = newDatabus(eventStore, new TestDataProvider().add(content));
+ OwnerAwareDatabus databus = newDatabus(eventStore, new TestDataProvider().add(content));
- List events = databus.poll("test-subscription", Duration.standardSeconds(30), 1);
+ List events = databus.poll("id", "test-subscription", Duration.standardSeconds(30), 1);
Event first = events.get(0);
assertEquals(first.getContent(), content);
@@ -87,9 +88,9 @@ public boolean poll(String subscription, Duration claimTtl, EventSink sink) {
}
};
Map content = entity("table", "key", ImmutableMap.of("rating", "5"));
- Databus databus = newDatabus(eventStore, new TestDataProvider().add(content));
+ OwnerAwareDatabus databus = newDatabus(eventStore, new TestDataProvider().add(content));
- List events = databus.poll("test-subscription", Duration.standardSeconds(30), 1);
+ List events = databus.poll("id", "test-subscription", Duration.standardSeconds(30), 1);
Event first = events.get(0);
assertEquals(first.getContent(), content);
@@ -118,9 +119,9 @@ public boolean poll(String subscription, Duration claimTtl, EventSink sink) {
}
};
Map content = entity("table", "key", ImmutableMap.of("rating", "5"));
- Databus databus = newDatabus(eventStore, new TestDataProvider().add(content));
+ OwnerAwareDatabus databus = newDatabus(eventStore, new TestDataProvider().add(content));
- List events = databus.poll("test-subscription", Duration.standardSeconds(30), 1);
+ List events = databus.poll("id", "test-subscription", Duration.standardSeconds(30), 1);
Event first = events.get(0);
assertEquals(first.getContent(), content);
@@ -152,9 +153,9 @@ public boolean poll(String subscription, Duration claimTtl, EventSink sink) {
}
};
Map content = entity("table", "key", ImmutableMap.of("rating", "5"));
- Databus databus = newDatabus(eventStore, new TestDataProvider().add(content));
+ OwnerAwareDatabus databus = newDatabus(eventStore, new TestDataProvider().add(content));
- List events = databus.poll("test-subscription", Duration.standardSeconds(30), 1);
+ List events = databus.poll("id", "test-subscription", Duration.standardSeconds(30), 1);
Event first = events.get(0);
assertEquals(first.getContent(), content);
@@ -202,7 +203,7 @@ public boolean poll(String subscription, Duration claimTtl, EventSink sink) {
DefaultDatabus databus = newDatabus(eventStore, new TestDataProvider().add(content), clock);
// Use a limit of 2 to force multiple calls to the event store.
- List events = databus.poll("test-subscription", Duration.standardSeconds(30), 2);
+ List events = databus.poll("id", "test-subscription", Duration.standardSeconds(30), 2);
Event first = events.get(0);
assertEquals(first.getContent(), content);
@@ -227,9 +228,10 @@ private DefaultDatabus newDatabus(DatabusEventStore eventStore, DataProvider dat
SubscriptionEvaluator subscriptionEvaluator = mock(SubscriptionEvaluator.class);
JobService jobService = mock(JobService.class);
JobHandlerRegistry jobHandlerRegistry = mock(JobHandlerRegistry.class);
+ DatabusAuthorizer databusAuthorizer = ConstantDatabusAuthorizer.ALLOW_ALL;
return new DefaultDatabus(lifeCycle, eventBus, dataProvider, subscriptionDao, eventStore, subscriptionEvaluator,
- jobService, jobHandlerRegistry, new MetricRegistry(), Suppliers.ofInstance(Conditions.alwaysFalse()),
- clock);
+ jobService, jobHandlerRegistry, databusAuthorizer, "replication",
+ Suppliers.ofInstance(Conditions.alwaysFalse()), new MetricRegistry(), clock);
}
private static EventData newEvent(final String id, String table, String key, UUID changeId) {
diff --git a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DatabusChannelConfigurationTest.java b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DatabusChannelConfigurationTest.java
index 103e3d17e1..37375b4f5f 100644
--- a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DatabusChannelConfigurationTest.java
+++ b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DatabusChannelConfigurationTest.java
@@ -1,9 +1,9 @@
package com.bazaarvoice.emodb.databus.core;
import com.bazaarvoice.emodb.databus.ChannelNames;
-import com.bazaarvoice.emodb.databus.api.DefaultSubscription;
-import com.bazaarvoice.emodb.databus.api.Subscription;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
+import com.bazaarvoice.emodb.databus.model.DefaultOwnedSubscription;
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
import com.bazaarvoice.emodb.sor.condition.Conditions;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@@ -18,9 +18,9 @@
public class DatabusChannelConfigurationTest {
@Test
public void testGetTTLForReplay() {
- Subscription replaySubscription = new DefaultSubscription(ChannelNames.getMasterReplayChannel(),
+ OwnedSubscription replaySubscription = new DefaultOwnedSubscription(ChannelNames.getMasterReplayChannel(),
Conditions.alwaysTrue(), new Date(DateTime.now().plus(Duration.standardDays(3650)).getMillis()),
- DatabusChannelConfiguration.REPLAY_TTL);
+ DatabusChannelConfiguration.REPLAY_TTL, "id");
SubscriptionDAO mockSubscriptionDao = mock(SubscriptionDAO.class);
when(mockSubscriptionDao.getSubscription(ChannelNames.getMasterReplayChannel())).thenReturn(replaySubscription);
DatabusChannelConfiguration dbusConf =
diff --git a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DatabusSizeCachingTest.java b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DatabusSizeCachingTest.java
index 7f3df9bfb6..8b10fedc81 100644
--- a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DatabusSizeCachingTest.java
+++ b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DatabusSizeCachingTest.java
@@ -1,6 +1,7 @@
package com.bazaarvoice.emodb.databus.core;
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
+import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobService;
@@ -44,7 +45,8 @@ public void testSizeCache() {
DefaultDatabus testDatabus = new DefaultDatabus(
mock(LifeCycleRegistry.class), mock(EventBus.class), mock(DataProvider.class), mock(SubscriptionDAO.class),
mockEventStore, mock(SubscriptionEvaluator.class), mock(JobService.class), mock(JobHandlerRegistry.class),
- mock(MetricRegistry.class), Suppliers.ofInstance(Conditions.alwaysFalse()), clock);
+ mock(DatabusAuthorizer.class), "replication", Suppliers.ofInstance(Conditions.alwaysFalse()),
+ mock(MetricRegistry.class), clock);
// At limit=500, size estimate should be at 4800
// At limit=50, size estimate should be at 5000
@@ -52,24 +54,24 @@ mockEventStore, mock(SubscriptionEvaluator.class), mock(JobService.class), mock(
when(mockEventStore.getSizeEstimate("testsubscription", 50L)).thenReturn(5000L);
// Let's get the size estimate with limit=50
- long size = testDatabus.getEventCountUpTo("testsubscription", 50L);
+ long size = testDatabus.getEventCountUpTo("id", "testsubscription", 50L);
assertEquals(size, 5000L, "Size should be 5000");
verify(mockEventStore, times(1)).getSizeEstimate("testsubscription", 50L);
// verify no more interaction for the second call within 15 seconds
- size = testDatabus.getEventCountUpTo("testsubscription", 50L);
+ size = testDatabus.getEventCountUpTo("id", "testsubscription", 50L);
assertEquals(size, 5000L, "Size should be 5000");
verifyNoMoreInteractions(mockEventStore);
// verify that it does interact if the accuracy is increased limit=500
- size = testDatabus.getEventCountUpTo("testsubscription", 500L);
+ size = testDatabus.getEventCountUpTo("id", "testsubscription", 500L);
assertEquals(size, 4800L, "Size should be 4800");
verify(mockEventStore, times(1)).getSizeEstimate("testsubscription", 500L);
// verify that it does *not* interact if the accuracy is decreased limit=50 over the next 14 seconds
for (int i=1; i <= 14; i++) {
when(clock.millis()).thenReturn(start + TimeUnit.SECONDS.toMillis(i));
- size = testDatabus.getEventCountUpTo("testsubscription", 50L);
+ size = testDatabus.getEventCountUpTo("id", "testsubscription", 50L);
assertEquals(size, 4800L, "Size should still be 4800");
verifyNoMoreInteractions(mockEventStore);
}
@@ -77,7 +79,7 @@ mockEventStore, mock(SubscriptionEvaluator.class), mock(JobService.class), mock(
// Simulate one more second elapsed, making the total 15
when(clock.millis()).thenReturn(start + TimeUnit.SECONDS.toMillis(15));
- size = testDatabus.getEventCountUpTo("testsubscription", 50L);
+ size = testDatabus.getEventCountUpTo("id", "testsubscription", 50L);
assertEquals(size, 5000L, "Size should be 5000");
// By now it should've interacted twice in the entire testing cycle
verify(mockEventStore, times(2)).getSizeEstimate("testsubscription", 50L);
diff --git a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DefaultDatabusTest.java b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DefaultDatabusTest.java
index d7e0c1af25..5b2ff97d18 100644
--- a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DefaultDatabusTest.java
+++ b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DefaultDatabusTest.java
@@ -1,6 +1,7 @@
package com.bazaarvoice.emodb.databus.core;
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
+import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobService;
@@ -37,24 +38,27 @@ public void testSubscriptionCreation() {
DefaultDatabus testDatabus = new DefaultDatabus(
mock(LifeCycleRegistry.class), mock(EventBus.class), mock(DataProvider.class), mockSubscriptionDao,
mock(DatabusEventStore.class), mock(SubscriptionEvaluator.class), mock(JobService.class),
- mock(JobHandlerRegistry.class), mock(MetricRegistry.class), ignoreReEtl, Clock.systemUTC());
+ mock(JobHandlerRegistry.class), mock(DatabusAuthorizer.class), "replication", ignoreReEtl,
+ mock(MetricRegistry.class), Clock.systemUTC());
Condition originalCondition = Conditions.mapBuilder().contains("foo", "bar").build();
- testDatabus.subscribe("test-subscription", originalCondition, Duration.standardDays(7),
+ testDatabus.subscribe("id", "test-subscription", originalCondition, Duration.standardDays(7),
Duration.standardDays(7));
// Skip databus events tagged with "re-etl"
Condition skipIgnoreTags = Conditions.not(Conditions.mapBuilder().matches(UpdateRef.TAGS_NAME, Conditions.containsAny("re-etl")).build());
Condition expectedConditionToSkipIgnore = Conditions.and(originalCondition, skipIgnoreTags);
- verify(mockSubscriptionDao).insertSubscription("test-subscription", expectedConditionToSkipIgnore,
+ verify(mockSubscriptionDao).insertSubscription("id", "test-subscription", expectedConditionToSkipIgnore,
Duration.standardDays(7), Duration.standardDays(7));
+ verify(mockSubscriptionDao).getSubscription("test-subscription");
verifyNoMoreInteractions(mockSubscriptionDao);
// reset mocked subscription DAO so it doesn't carry information about old interactions
reset(mockSubscriptionDao);
// Test condition is unchanged if includeDefaultJoinFilter is set to false
- testDatabus.subscribe("test-subscription", originalCondition, Duration.standardDays(7),
+ testDatabus.subscribe("id", "test-subscription", originalCondition, Duration.standardDays(7),
Duration.standardDays(7), false);
- verify(mockSubscriptionDao).insertSubscription("test-subscription", originalCondition, Duration.standardDays(7),
+ verify(mockSubscriptionDao).insertSubscription("id", "test-subscription", originalCondition, Duration.standardDays(7),
Duration.standardDays(7));
+ verify(mockSubscriptionDao).getSubscription("test-subscription");
verifyNoMoreInteractions(mockSubscriptionDao);
}
}
diff --git a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DefaultFanoutTest.java b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DefaultFanoutTest.java
new file mode 100644
index 0000000000..5e2292a971
--- /dev/null
+++ b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/DefaultFanoutTest.java
@@ -0,0 +1,169 @@
+package com.bazaarvoice.emodb.databus.core;
+
+import com.bazaarvoice.emodb.common.uuid.TimeUUIDs;
+import com.bazaarvoice.emodb.databus.ChannelNames;
+import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
+import com.bazaarvoice.emodb.databus.model.DefaultOwnedSubscription;
+import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
+import com.bazaarvoice.emodb.datacenter.api.DataCenter;
+import com.bazaarvoice.emodb.event.api.EventData;
+import com.bazaarvoice.emodb.sor.api.Intrinsic;
+import com.bazaarvoice.emodb.sor.api.TableOptionsBuilder;
+import com.bazaarvoice.emodb.sor.condition.Conditions;
+import com.bazaarvoice.emodb.sor.core.DataProvider;
+import com.bazaarvoice.emodb.sor.core.UpdateRef;
+import com.bazaarvoice.emodb.table.db.Table;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Date;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+@SuppressWarnings("unchecked")
+public class DefaultFanoutTest {
+
+ private DefaultFanout _defaultFanout;
+ private Supplier> _subscriptionsSupplier;
+ private DataCenter _currentDataCenter;
+ private DataCenter _remoteDataCenter;
+ private DataProvider _dataProvider;
+ private DatabusAuthorizer _databusAuthorizer;
+ private String _remoteChannel;
+ private Multimap _eventsSinked;
+
+ @BeforeMethod
+ private void setUp() {
+ _eventsSinked = ArrayListMultimap.create();
+
+ Function, Void> eventSink = new Function, Void>() {
+ @Override
+ public Void apply(Multimap input) {
+ _eventsSinked.putAll(input);
+ return null;
+ }
+ };
+
+ _subscriptionsSupplier = mock(Supplier.class);
+ _currentDataCenter = mock(DataCenter.class);
+ when(_currentDataCenter.getName()).thenReturn("local");
+ _remoteDataCenter = mock(DataCenter.class);
+ when(_remoteDataCenter.getName()).thenReturn("remote");
+ _remoteChannel = ChannelNames.getReplicationFanoutChannel(_remoteDataCenter);
+
+ RateLimitedLogFactory rateLimitedLogFactory = mock(RateLimitedLogFactory.class);
+ when(rateLimitedLogFactory.from(any(Logger.class))).thenReturn(mock(RateLimitedLog.class));
+
+ _dataProvider = mock(DataProvider.class);
+ _databusAuthorizer = mock(DatabusAuthorizer.class);
+
+ SubscriptionEvaluator subscriptionEvaluator = new SubscriptionEvaluator(_dataProvider, rateLimitedLogFactory);
+
+ _defaultFanout = new DefaultFanout("test", mock(EventSource.class), eventSink, true, Duration.standardSeconds(1),
+ _subscriptionsSupplier, _currentDataCenter, rateLimitedLogFactory, subscriptionEvaluator,
+ _databusAuthorizer, new MetricRegistry());
+ }
+
+ @Test
+ public void testMatchingTable() {
+ addTable("matching-table");
+
+ OwnedSubscription subscription = new DefaultOwnedSubscription(
+ "test", Conditions.intrinsic(Intrinsic.TABLE, Conditions.equal("matching-table")),
+ new Date(), Duration.standardDays(1), "owner0");
+
+ EventData event = newEvent("id0", "matching-table", "key0");
+
+ when(_subscriptionsSupplier.get()).thenReturn(ImmutableList.of(subscription));
+ DatabusAuthorizer.DatabusAuthorizerByOwner authorizerByOwner = mock(DatabusAuthorizer.DatabusAuthorizerByOwner.class);
+ when(authorizerByOwner.canReceiveEventsFromTable("matching-table")).thenReturn(true);
+ when(_databusAuthorizer.owner("owner0")).thenReturn(authorizerByOwner);
+
+ _defaultFanout.copyEvents(ImmutableList.of(event));
+
+ assertEquals(_eventsSinked,
+ ImmutableMultimap.of("test", event.getData(), _remoteChannel, event.getData()));
+ }
+
+ @Test
+ public void testNotMatchingTable() {
+ addTable("other-table");
+
+ OwnedSubscription subscription = new DefaultOwnedSubscription(
+ "test", Conditions.intrinsic(Intrinsic.TABLE, Conditions.equal("not-matching-table")),
+ new Date(), Duration.standardDays(1), "owner0");
+
+ EventData event = newEvent("id0", "other-table", "key0");
+
+ when(_subscriptionsSupplier.get()).thenReturn(ImmutableList.of(subscription));
+ DatabusAuthorizer.DatabusAuthorizerByOwner authorizerByOwner = mock(DatabusAuthorizer.DatabusAuthorizerByOwner.class);
+ when(authorizerByOwner.canReceiveEventsFromTable("matching-table")).thenReturn(true);
+ when(_databusAuthorizer.owner("owner0")).thenReturn(authorizerByOwner);
+
+ _defaultFanout.copyEvents(ImmutableList.of(event));
+
+ // Event does not match subscription, should only go to remote fanout
+ assertEquals(_eventsSinked,
+ ImmutableMultimap.of(_remoteChannel, event.getData()));
+ }
+
+ @Test
+ public void testUnauthorizedFanout() {
+ addTable("unauthorized-table");
+
+ OwnedSubscription subscription = new DefaultOwnedSubscription(
+ "test", Conditions.intrinsic(Intrinsic.TABLE, Conditions.equal("unauthorized-table")),
+ new Date(), Duration.standardDays(1), "owner0");
+
+ EventData event = newEvent("id0", "unauthorized-table", "key0");
+
+ when(_subscriptionsSupplier.get()).thenReturn(ImmutableList.of(subscription));
+ DatabusAuthorizer.DatabusAuthorizerByOwner authorizerByOwner = mock(DatabusAuthorizer.DatabusAuthorizerByOwner.class);
+ when(authorizerByOwner.canReceiveEventsFromTable("matching-table")).thenReturn(false);
+ when(_databusAuthorizer.owner("owner0")).thenReturn(authorizerByOwner);
+
+ _defaultFanout.copyEvents(ImmutableList.of(event));
+
+ // Event is not authorized for owner, should only go to remote fanout
+ assertEquals(_eventsSinked,
+ ImmutableMultimap.of(_remoteChannel, event.getData()));
+
+ }
+
+ private void addTable(String tableName) {
+ Table table = mock(Table.class);
+ when(table.getName()).thenReturn(tableName);
+ when(table.getAttributes()).thenReturn(ImmutableMap.of());
+ when(table.getOptions()).thenReturn(new TableOptionsBuilder().setPlacement("placement").build());
+ // Put in another data center to force replication
+ when(table.getDataCenters()).thenReturn(ImmutableList.of(_currentDataCenter, _remoteDataCenter));
+ when(_dataProvider.getTable(tableName)).thenReturn(table);
+ }
+
+ private EventData newEvent(String id, String table, String key) {
+ EventData eventData = mock(EventData.class);
+ when(eventData.getId()).thenReturn(id);
+
+ UpdateRef updateRef = new UpdateRef(table, key, TimeUUIDs.newUUID(), ImmutableSet.of());
+ ByteBuffer data = UpdateRefSerializer.toByteBuffer(updateRef);
+ when(eventData.getData()).thenReturn(data);
+
+ return eventData;
+ }
+}
diff --git a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/ReplayRequestTest.java b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/ReplayRequestTest.java
index cf35ba9ace..1edbd1de79 100644
--- a/databus/src/test/java/com/bazaarvoice/emodb/databus/core/ReplayRequestTest.java
+++ b/databus/src/test/java/com/bazaarvoice/emodb/databus/core/ReplayRequestTest.java
@@ -15,12 +15,13 @@ public class ReplayRequestTest {
@Test
public void testReplayRequestJson() {
String json = "{" +
- "\"subscription\":\"test\"}";
+ "\"ownerId\":\"123\",\"subscription\":\"test\"}";
ReplaySubscriptionRequest request = JsonHelper.fromJson(json, ReplaySubscriptionRequest.class);
assertEquals(JsonHelper.asJson(request), json, "Json representation without 'since' looks good");
SimpleDateFormat dateFmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZZ");
dateFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
String jsonWithSince = "{" +
+ "\"ownerId\":\"123\"," +
"\"subscription\":\"test\"," +
"\"since\":\"" + dateFmt.format(new Date()) + "\"}";
request = JsonHelper.fromJson(jsonWithSince, ReplaySubscriptionRequest.class);
diff --git a/datacenter/src/main/java/com/bazaarvoice/emodb/datacenter/core/DefaultDataCenters.java b/datacenter/src/main/java/com/bazaarvoice/emodb/datacenter/core/DefaultDataCenters.java
index af536f02bd..055b44b09e 100644
--- a/datacenter/src/main/java/com/bazaarvoice/emodb/datacenter/core/DefaultDataCenters.java
+++ b/datacenter/src/main/java/com/bazaarvoice/emodb/datacenter/core/DefaultDataCenters.java
@@ -38,6 +38,16 @@ public DefaultDataCenters(DataCenterDAO dataCenterDao,
refresh();
}
+ /**
+ * DefaultDataCenters doesn't actually directly require DataCenterAnnouncer. However, it is frequently the case
+ * that classes that depend on DefaultDataCenters will only operate correctly if the DataCenterAnnouncer has been
+ * started first. The following false dependency forces this injection order when appropriate.
+ */
+ @Inject(optional=true)
+ private void injectDataCenterAnnouncer(DataCenterAnnouncer ignore) {
+ // no-op
+ }
+
@Override
public void refresh() {
_cache = Suppliers.memoizeWithExpiration(new Supplier() {
@@ -65,7 +75,7 @@ public DataCenter getSystem() {
private DataCenter get(String name) {
DataCenter dataCenter = _cache.get().get(name);
- checkArgument(dataCenter != null, "Unknown data center: {}", name);
+ checkArgument(dataCenter != null, "Unknown data center: %s", name);
return dataCenter;
}
diff --git a/quality/integration/src/test/java/test/integration/databus/CasDatabusTest.java b/quality/integration/src/test/java/test/integration/databus/CasDatabusTest.java
index 2ed25c5d82..54b77138d2 100644
--- a/quality/integration/src/test/java/test/integration/databus/CasDatabusTest.java
+++ b/quality/integration/src/test/java/test/integration/databus/CasDatabusTest.java
@@ -13,13 +13,16 @@
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.SimpleLifeCycleRegistry;
import com.bazaarvoice.emodb.common.dropwizard.service.EmoServiceMode;
import com.bazaarvoice.emodb.common.dropwizard.task.TaskRegistry;
+import com.bazaarvoice.emodb.databus.SystemInternalId;
import com.bazaarvoice.emodb.databus.DatabusConfiguration;
import com.bazaarvoice.emodb.databus.DatabusHostDiscovery;
import com.bazaarvoice.emodb.databus.DatabusModule;
import com.bazaarvoice.emodb.databus.DatabusZooKeeper;
import com.bazaarvoice.emodb.databus.DefaultJoinFilter;
import com.bazaarvoice.emodb.databus.ReplicationKey;
-import com.bazaarvoice.emodb.databus.api.Databus;
+import com.bazaarvoice.emodb.databus.auth.ConstantDatabusAuthorizer;
+import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
+import com.bazaarvoice.emodb.databus.core.DatabusFactory;
import com.bazaarvoice.emodb.datacenter.DataCenterConfiguration;
import com.bazaarvoice.emodb.datacenter.DataCenterModule;
import com.bazaarvoice.emodb.datacenter.api.KeyspaceDiscovery;
@@ -74,7 +77,7 @@
public class CasDatabusTest {
private SimpleLifeCycleRegistry _lifeCycle;
private HealthCheckRegistry _healthChecks;
- private Databus _bus;
+ private DatabusFactory _bus;
@BeforeClass
public void setup() throws Exception {
@@ -143,6 +146,8 @@ protected void configure() {
bind(JobService.class).toInstance(mock(JobService.class));
bind(JobHandlerRegistry.class).toInstance(mock(JobHandlerRegistry.class));
+ bind(DatabusAuthorizer.class).toInstance(ConstantDatabusAuthorizer.ALLOW_ALL);
+ bind(String.class).annotatedWith(SystemInternalId.class).toInstance("system");
bind(new TypeLiteral>(){}).annotatedWith(DefaultJoinFilter.class)
.toInstance(Suppliers.ofInstance(Conditions.alwaysFalse()));
@@ -156,7 +161,7 @@ protected void configure() {
install(new DatabusModule(serviceMode, metricRegistry));
}
});
- _bus = injector.getInstance(Databus.class);
+ _bus = injector.getInstance(DatabusFactory.class);
_lifeCycle.start();
}
diff --git a/quality/integration/src/test/java/test/integration/databus/DatabusJerseyTest.java b/quality/integration/src/test/java/test/integration/databus/DatabusJerseyTest.java
index 9c4c03a839..d3405eaf13 100644
--- a/quality/integration/src/test/java/test/integration/databus/DatabusJerseyTest.java
+++ b/quality/integration/src/test/java/test/integration/databus/DatabusJerseyTest.java
@@ -2,7 +2,7 @@
import com.bazaarvoice.emodb.auth.apikey.ApiKey;
import com.bazaarvoice.emodb.auth.apikey.ApiKeyRequest;
-import com.bazaarvoice.emodb.client.EmoClientException;
+import com.bazaarvoice.emodb.auth.jersey.Subject;
import com.bazaarvoice.emodb.common.api.UnauthorizedException;
import com.bazaarvoice.emodb.common.jersey.dropwizard.JerseyEmoClient;
import com.bazaarvoice.emodb.common.json.JsonHelper;
@@ -14,16 +14,19 @@
import com.bazaarvoice.emodb.databus.api.MoveSubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.ReplaySubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.Subscription;
+import com.bazaarvoice.emodb.databus.api.UnauthorizedSubscriptionException;
import com.bazaarvoice.emodb.databus.api.UnknownSubscriptionException;
import com.bazaarvoice.emodb.databus.client.DatabusAuthenticator;
import com.bazaarvoice.emodb.databus.client.DatabusClient;
import com.bazaarvoice.emodb.databus.core.DatabusChannelConfiguration;
import com.bazaarvoice.emodb.databus.core.DatabusEventStore;
+import com.bazaarvoice.emodb.databus.core.DatabusFactory;
import com.bazaarvoice.emodb.sor.api.Intrinsic;
import com.bazaarvoice.emodb.sor.condition.Condition;
import com.bazaarvoice.emodb.sor.condition.Conditions;
import com.bazaarvoice.emodb.sor.core.UpdateRef;
import com.bazaarvoice.emodb.test.ResourceTest;
+import com.bazaarvoice.emodb.web.resources.databus.DatabusClientSubjectProxy;
import com.bazaarvoice.emodb.web.resources.databus.DatabusResource1;
import com.bazaarvoice.emodb.web.resources.databus.DatabusResourcePoller;
import com.bazaarvoice.emodb.web.resources.databus.LongPollingExecutorServices;
@@ -38,10 +41,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
-import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.spi.inject.SingletonTypeInjectableProvider;
import io.dropwizard.testing.junit.ResourceTestRule;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.After;
@@ -73,7 +78,9 @@
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -87,41 +94,66 @@
*/
public class DatabusJerseyTest extends ResourceTest {
private static final String APIKEY_DATABUS = "databus-key";
+ private static final String INTERNAL_ID_DATABUS = "databus-id";
private static final String APIKEY_UNAUTHORIZED = "unauthorized-key";
+ private static final String INTERNAL_ID_UNAUTHORIZED = "unauthorized-id";
private final PartitionContextValidator _pcxtv =
OstrichAccessors.newPartitionContextTest(AuthDatabus.class, DatabusClient.class);
+ private final DatabusFactory _factory = mock(DatabusFactory.class);
private final Databus _server = mock(Databus.class);
- private final AuthDatabus _proxy = mock(AuthDatabus.class);
+ private final DatabusClientSubjectProxy _proxyProvider = mock(DatabusClientSubjectProxy.class);
+ private final Databus _proxy = mock(Databus.class);
@Rule
public ResourceTestRule _resourceTestRule = setupResourceTestRule(
- Collections.