diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/DistributedSubImpl.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/DistributedSubImpl.java index b113ae3bc0..13242db4ff 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/DistributedSubImpl.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/DistributedSubImpl.java @@ -53,15 +53,16 @@ final class DistributedSubImpl implements DistributedSub { @Override public CompletionStage subscribeWithFilterAndAck(final Collection topics, final ActorRef subscriber, final Predicate> filter) { + // TODO: check group is non-empty if non-null final Subscribe subscribe = - Subscribe.of(new HashSet<>(topics), subscriber, writeAll, true, filter); + Subscribe.of(new HashSet<>(topics), subscriber, writeAll, true, filter, null); return askSubSupervisor(subscribe); } @Override public CompletionStage subscribeWithAck(final Collection topics, final ActorRef subscriber) { - return askSubSupervisor(Subscribe.of(new HashSet<>(topics), subscriber, writeAll, true)); + return askSubSupervisor(Subscribe.of(new HashSet<>(topics), subscriber, writeAll, true, null)); } @Override @@ -84,7 +85,7 @@ private CompletionStage askSubSupervisor(final Request request) { public void subscribeWithoutAck(final Collection topics, final ActorRef subscriber) { final Request request = Subscribe.of(new HashSet<>(topics), subscriber, - (Replicator.WriteConsistency) Replicator.writeLocal(), false); + (Replicator.WriteConsistency) Replicator.writeLocal(), false, null); subSupervisor.tell(request, subscriber); } diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater.java index 08a1ea1982..41520e8655 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AbstractUpdater.java @@ -51,9 +51,6 @@ */ abstract class AbstractUpdater extends AbstractActorWithTimers { - // pseudo-random number generator for force updates. quality matters little. - private final Random random = new Random(); - protected final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this); protected final PubSubConfig config; @@ -160,13 +157,6 @@ protected void updateFailure(final Status.Failure failure) { localSubscriptionsChanged = true; } - /** - * @return whether the next update is a random force update. - */ - protected boolean forceUpdate() { - return random.nextDouble() < config.getForceUpdateProbability(); - } - /** * Add a request to the queue to be handled after cluster update. * diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdater.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdater.java index 5108fc84e8..80dfa1ca3c 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdater.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdater.java @@ -165,13 +165,14 @@ private boolean isAllowedRemotelyBy(final Grouped groupedLabels, private boolean isAllowedRemotelyBy(@Nullable final String group, final Set ackLabels, final Map> remoteGroups, final Predicate isTakenRemotely) { - if (group != null) { + final boolean noConflict = noDeclaredLabelMatches(ackLabels, isTakenRemotely); + if (noConflict && group != null) { final Set remoteGroup = remoteGroups.get(group); if (remoteGroup != null) { return remoteGroup.equals(ackLabels); } } - return noDeclaredLabelMatches(ackLabels, isTakenRemotely); + return noConflict; } private boolean noDeclaredLabelMatches(final Set ackLabels, final Predicate contains) { diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/DDataChanged.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/DDataChanged.java deleted file mode 100644 index ee74d56feb..0000000000 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/DDataChanged.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2020 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.ditto.services.utils.pubsub.actors; - -import java.util.Map; - -import akka.actor.Address; -import scala.collection.immutable.Set; - -/** - * Notification that the distributed data changed. - */ -final class DDataChanged { - - private final Map> multimap; - - DDataChanged(final Map> multimap) { - this.multimap = multimap; - } - - /** - * The changed distributed multimap as a Java map. - * - * @return the changed distributed data. - */ - public Map> getMultiMap() { - return multimap; - } -} diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/Publisher.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/Publisher.java index 8e874a6511..47b48371ca 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/Publisher.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/Publisher.java @@ -15,8 +15,11 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.model.base.acks.AcknowledgementLabel; import org.eclipse.ditto.model.base.acks.AcknowledgementRequest; import org.eclipse.ditto.model.base.entity.id.EntityIdWithType; @@ -28,6 +31,7 @@ import org.eclipse.ditto.services.utils.pubsub.DistributedAcks; import org.eclipse.ditto.services.utils.pubsub.api.RemoteAcksChanged; import org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader; +import org.eclipse.ditto.services.utils.pubsub.ddata.ack.Grouped; import org.eclipse.ditto.services.utils.pubsub.extractors.AckExtractor; import org.eclipse.ditto.signals.acks.base.Acknowledgements; @@ -36,35 +40,33 @@ import akka.actor.Address; import akka.actor.Props; import akka.cluster.ddata.Replicator; +import akka.japi.Pair; import akka.japi.pf.ReceiveBuilder; -import scala.collection.immutable.Set; import scala.jdk.javaapi.CollectionConverters; /** - * Publishes messages according to topic Bloom filters. - * - * @param representation of topics in the distributed data. + * Publishes messages according to topic distributed data. */ -public final class Publisher extends AbstractActor { +public final class Publisher extends AbstractActor { /** * Prefix of this actor's name. */ - public static final String ACTOR_NAME_PREFIX = "publisher"; + public static final java.lang.String ACTOR_NAME_PREFIX = "publisher"; private final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this); - private final DDataReader ddataReader; + private final DDataReader ddataReader; private final Counter messageCounter = DittoMetrics.counter("pubsub-published-messages"); private final Counter topicCounter = DittoMetrics.counter("pubsub-published-topics"); - private Map> topicSubscribers = Map.of(); - private Map> declaredAcks = Map.of(); - private java.util.Set allDeclaredAcks = java.util.Set.of(); + private Map> topicSubscribers = Map.of(); + private Map> declaredAcks = Map.of(); + private Set allDeclaredAcks = Set.of(); @SuppressWarnings("unused") - private Publisher(final DDataReader ddataReader, final DistributedAcks distributedAcks) { + private Publisher(final DDataReader ddataReader, final DistributedAcks distributedAcks) { this.ddataReader = ddataReader; ddataReader.receiveChanges(getSelf()); distributedAcks.receiveDistributedDeclaredAcks(getSelf()); @@ -90,7 +92,7 @@ public static Props props(final DDataReader ddataReader, final * @param message the message to publish. * @return a publish message. */ - public static Request publish(final Collection topics, final Object message) { + public static Request publish(final Collection topics, final Object message) { return new Publish(topics, message); } @@ -104,9 +106,9 @@ public static Request publish(final Collection topics, final Object mess * @param dittoHeaders the Ditto headers of any weak acknowledgements to send back. * @return the request. */ - public static Request publishWithAck(final Collection topics, + public static Request publishWithAck(final Collection topics, final Object message, - final java.util.Set ackRequests, + final Set ackRequests, final EntityIdWithType entityId, final DittoHeaders dittoHeaders) { @@ -130,9 +132,9 @@ private void publish(final Publish publish) { private void publishWithAck(final PublishWithAck publishWithAck) { final Collection subscribers = doPublish(publishWithAck.topics, publishWithAck.message); - final java.util.Set subscriberDeclaredAcks = subscribers.stream() - .map(subscriber -> declaredAcks.getOrDefault(subscriber.path().address(), java.util.Set.of())) - .flatMap(java.util.Set::stream) + final Set subscriberDeclaredAcks = subscribers.stream() + .map(subscriber -> declaredAcks.getOrDefault(subscriber.path().address(), Set.of())) + .flatMap(Set::stream) .collect(Collectors.toSet()); final Collection requestedCustomAcks = @@ -147,12 +149,12 @@ private void publishWithAck(final PublishWithAck publishWithAck) { } } - private Collection doPublish(final Collection topics, final Object message) { + private Collection doPublish(final Collection topics, final Object message) { messageCounter.increment(); topicCounter.increment(topics.size()); - final List hashes = topics.stream().map(ddataReader::approximate).collect(Collectors.toList()); + final List hashes = topics.stream().map(ddataReader::approximate).collect(Collectors.toList()); final ActorRef sender = getSender(); - final Collection subscribers = ddataReader.getSubscribers(topicSubscribers, hashes); + final Collection subscribers = getSubscribers(hashes); subscribers.forEach(subscriber -> subscriber.tell(message, sender)); return subscribers; } @@ -160,18 +162,37 @@ private Collection doPublish(final Collection topics, final Ob private void declaredAcksChanged(final RemoteAcksChanged event) { declaredAcks = event.getMultiMap(); allDeclaredAcks = declaredAcks.values().stream() - .flatMap(java.util.Set::stream) + .flatMap(Set::stream) .collect(Collectors.toSet()); } private void topicSubscribersChanged(final Replicator.Changed event) { - topicSubscribers = CollectionConverters.asJava(event.get(ddataReader.getKey()).entries()); + final Map> mmap = + CollectionConverters.asJava(event.get(ddataReader.getKey()).entries()); + topicSubscribers = mmap.entrySet() + .stream() + .map(entry -> Pair.create(entry.getKey(), deserializeGroupedHashes(entry.getValue()))) + .collect(Collectors.toMap(Pair::first, Pair::second)); } private void logUnhandled(final Object message) { log.warning("Unhandled: <{}>", message); } + private Set getSubscribers(final Collection hashes) { + return topicSubscribers.entrySet() + .stream() + .filter(entry -> hashes.stream().anyMatch(entry.getValue()::contains)) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + + private static Set deserializeGroupedHashes(final scala.collection.immutable.Set strings) { + return CollectionConverters.asJava(strings).stream() + .flatMap(string -> Grouped.fromJson(JsonObject.of(string), JsonValue::asLong).getValues().stream()) + .collect(Collectors.toSet()); + } + /** * Requests to a publisher actor. */ @@ -183,10 +204,10 @@ public interface Request {} */ private static final class Publish implements Request { - private final Collection topics; + private final Collection topics; private final Object message; - private Publish(final Collection topics, final Object message) { + private Publish(final Collection topics, final Object message) { this.topics = topics; this.message = message; } @@ -200,14 +221,14 @@ private static final class PublishWithAck implements Request { private static final AckExtractor ACK_EXTRACTOR = AckExtractor.of(p -> p.entityId, p -> p.dittoHeaders); - private final Collection topics; + private final Collection topics; private final Object message; - private final java.util.Set ackRequests; + private final Set ackRequests; private final EntityIdWithType entityId; private final DittoHeaders dittoHeaders; - private PublishWithAck(final Collection topics, final Object message, - final java.util.Set ackRequests, + private PublishWithAck(final Collection topics, final Object message, + final Set ackRequests, final EntityIdWithType entityId, final DittoHeaders dittoHeaders) { this.topics = topics; diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater.java index 4e168b7fdc..9f12dd9dcd 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater.java @@ -70,7 +70,7 @@ public static Props props(final PubSubConfig config, final ActorRef subscrib @Override protected void subscribe(final Subscribe subscribe) { final boolean changed = - subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter()); + subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter(), null); enqueueRequest(subscribe, changed, getSender(), awaitUpdate, awaitUpdateMetric); if (changed) { getContext().watch(subscribe.getSubscriber()); @@ -102,7 +102,7 @@ protected void ddataOpSuccess(final DDataOpSuccess opSucces @Override protected void tick(final Clock tick) { - performDDataOp(forceUpdate(), localSubscriptionsChanged, nextWriteConsistency) + performDDataOp(localSubscriptionsChanged, nextWriteConsistency) .handle(handleDDataWriteResult(getSeqNr(), nextWriteConsistency)); moveAwaitUpdateToAwaitAcknowledge(); } @@ -113,12 +113,11 @@ private void flushSubAcks(final int seqNr) { } } - private CompletionStage performDDataOp(final boolean forceUpdate, - final boolean localSubscriptionsChanged, + private CompletionStage performDDataOp(final boolean localSubscriptionsChanged, final Replicator.WriteConsistency writeConsistency) { final SubscriptionsReader snapshot; final CompletionStage ddataOp; - if (!localSubscriptionsChanged && !forceUpdate) { + if (!localSubscriptionsChanged) { snapshot = subscriptions.snapshot(); ddataOp = CompletableFuture.completedStage(null); } else if (subscriptions.isEmpty()) { @@ -127,7 +126,7 @@ private CompletionStage performDDataOp(final boolean forceU topicMetric.set(0L); } else { // export before taking snapshot so that implementations may output incremental update. - final T ddata = subscriptions.export(forceUpdate); + final T ddata = subscriptions.export(); // take snapshot to give to the subscriber; clear accumulated incremental changes. snapshot = subscriptions.snapshot(); ddataOp = topicsWriter.put(subscriber, ddata, writeConsistency); diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/SubscriptionsChanged.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/SubscriptionsChanged.java deleted file mode 100644 index 66adc78df8..0000000000 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/SubscriptionsChanged.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2020 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.ditto.services.utils.pubsub.actors; - -import org.eclipse.ditto.services.utils.pubsub.ddata.SubscriptionsReader; - -/** - * Local message sent to whomever is interested in changes to subscriptions, containing a snapshot of the current local - * subscriptions. - */ -public final class SubscriptionsChanged { - - private final SubscriptionsReader subscriptionsReader; - - SubscriptionsChanged(final SubscriptionsReader subscriptionsReader) { - this.subscriptionsReader = subscriptionsReader; - } - - /** - * The snapshot of local subscriptions. - * - * @return the snapshot. - */ - public SubscriptionsReader getSubscriptionsReader() { - return subscriptionsReader; - } -} diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/api/Subscribe.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/api/Subscribe.java index 301780136f..3230e45058 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/api/Subscribe.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/api/Subscribe.java @@ -13,9 +13,12 @@ package org.eclipse.ditto.services.utils.pubsub.api; import java.util.Collection; +import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import javax.annotation.Nullable; + import akka.actor.ActorRef; import akka.cluster.ddata.Replicator; @@ -27,14 +30,16 @@ public final class Subscribe extends AbstractRequest { private static final Predicate> CONSTANT_TRUE = topics -> true; private final Predicate> filter; + @Nullable private final String group; private Subscribe(final Set topics, final ActorRef subscriber, final Replicator.WriteConsistency writeConsistency, final boolean acknowledge, - final Predicate> filter) { + final Predicate> filter, @Nullable final String group) { super(topics, subscriber, writeConsistency, acknowledge); this.filter = filter; + this.group = group; } /** @@ -44,13 +49,15 @@ private Subscribe(final Set topics, * @param subscriber who is subscribing. * @param writeConsistency with which write consistency should this subscription be updated. * @param acknowledge whether acknowledgement is desired. + * @param group any group the subscriber belongs to, or null. * @return the request. */ public static Subscribe of(final Set topics, final ActorRef subscriber, final Replicator.WriteConsistency writeConsistency, - final boolean acknowledge) { - return new Subscribe(topics, subscriber, writeConsistency, acknowledge, CONSTANT_TRUE); + final boolean acknowledge, + @Nullable final String group) { + return new Subscribe(topics, subscriber, writeConsistency, acknowledge, CONSTANT_TRUE, group); } /** @@ -61,14 +68,16 @@ public static Subscribe of(final Set topics, * @param writeConsistency with which write consistency should this subscription be updated. * @param acknowledge whether acknowledgement is desired. * @param filter local filter for incoming messages. + * @param group any group the subscriber belongs to, or null. * @return the request. */ public static Subscribe of(final Set topics, final ActorRef subscriber, final Replicator.WriteConsistency writeConsistency, final boolean acknowledge, - final Predicate> filter) { - return new Subscribe(topics, subscriber, writeConsistency, acknowledge, filter); + final Predicate> filter, + @Nullable final String group) { + return new Subscribe(topics, subscriber, writeConsistency, acknowledge, filter, group); } /** @@ -78,4 +87,11 @@ public Predicate> getFilter() { return filter; } + /** + * @return the group the subscriber belongs to, or an empty optional. + */ + public Optional getGroup() { + return Optional.ofNullable(group); + } + } diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/config/DefaultPubSubConfig.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/config/DefaultPubSubConfig.java index 2b05cc9087..49c1d5a8a0 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/config/DefaultPubSubConfig.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/config/DefaultPubSubConfig.java @@ -39,13 +39,11 @@ final class DefaultPubSubConfig implements PubSubConfig { private final String seed; private final Duration restartDelay; private final Duration updateInterval; - private final double forceUpdateProbability; private DefaultPubSubConfig(final ConfigWithFallback config) { seed = config.getString(ConfigValue.SEED.getConfigPath()); restartDelay = config.getDuration(ConfigValue.RESTART_DELAY.getConfigPath()); updateInterval = config.getDuration(ConfigValue.UPDATE_INTERVAL.getConfigPath()); - forceUpdateProbability = config.getDouble(ConfigValue.FORCE_UPDATE_PROBABILITY.getConfigPath()); } static PubSubConfig of(final Config config) { @@ -67,21 +65,12 @@ public Duration getUpdateInterval() { return updateInterval; } - @Override - public double getForceUpdateProbability() { - return forceUpdateProbability; - } - private String[] getFieldNames() { - return new String[]{ - "seed", "restartDelay", "updateInterval", "forceUpdateProbability" - }; + return new String[]{"seed", "restartDelay", "updateInterval"}; } private Object[] getFieldValues() { - return new Object[]{ - seed, restartDelay, updateInterval, forceUpdateProbability - }; + return new Object[]{seed, restartDelay, updateInterval}; } @Override diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/config/PubSubConfig.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/config/PubSubConfig.java index bbc40c6d94..7c7f5d8797 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/config/PubSubConfig.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/config/PubSubConfig.java @@ -41,12 +41,6 @@ public interface PubSubConfig { */ Duration getUpdateInterval(); - /** - * @return Probability of forcing an update on each clock tick to recover from - * temporary disassociation. - */ - double getForceUpdateProbability(); - /** * Create a {@code PubSubConfig} object from a {@code Config} object at the key {@code pubsub}. * @@ -91,14 +85,7 @@ enum ConfigValue implements KnownConfigValue { /** * How often to flush local subscriptions to the distributed data replicator. */ - UPDATE_INTERVAL("update-interval", Duration.ofSeconds(3L)), - - /** - * Probability to flush local subscriptions when there was no change to recover - * from temporary disassociation, during which a remove member may remove our subscriber - * from the distributed data when prompted by a cluster event MemberRemoved. - */ - FORCE_UPDATE_PROBABILITY("force-update-probability", 0.01); + UPDATE_INTERVAL("update-interval", Duration.ofSeconds(3L)); private final String path; private final Object defaultValue; diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractDDataHandler.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractDDataHandler.java index 60fe54217b..8f649800ca 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractDDataHandler.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractDDataHandler.java @@ -12,11 +12,9 @@ */ package org.eclipse.ditto.services.utils.pubsub.ddata; -import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.stream.Collectors; import org.eclipse.ditto.services.utils.ddata.DistributedData; import org.eclipse.ditto.services.utils.ddata.DistributedDataConfig; @@ -62,22 +60,7 @@ protected AbstractDDataHandler(final DistributedDataConfig config, public abstract CompletionStage removeAddress(Address address, Replicator.WriteConsistency writeConsistency); @Override - public abstract S approximate(final String topic); - - @Override - public Collection getSubscribers(final Map> mmap, - final Collection topic) { - return mmap.entrySet() - .stream() - .filter(entry -> topic.stream().anyMatch(entry.getValue()::contains)) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - } - - @Override - public CompletionStage> getSubscribers(final Collection topic) { - return read().thenApply(mmap -> getSubscribers(mmap, topic)); - } + public abstract long approximate(final String topic); @Override public CompletionStage>> read( @@ -99,22 +82,8 @@ public CompletionStage>> read( public CompletionStage put(final K ownSubscriber, final T topics, final Replicator.WriteConsistency writeConsistency) { - if (topics.shouldReplaceAll()) { - // complete replacement - return update(writeConsistency, mmap -> mmap.put(selfUniqueAddress, ownSubscriber, topics.getInserts())); - } else { - // incremental update - return update(writeConsistency, mmap -> { - ORMultiMap result = mmap; - for (final S inserted : topics.getInserts()) { - result = result.addBinding(selfUniqueAddress, ownSubscriber, inserted); - } - for (final S deleted : topics.getDeletes()) { - result = result.removeBinding(selfUniqueAddress, ownSubscriber, deleted); - } - return result; - }); - } + // complete replacement + return update(writeConsistency, mmap -> mmap.put(selfUniqueAddress, ownSubscriber, topics.getInserts())); } @Override diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptions.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptions.java index d4c0ff6c35..d0d8ea4de7 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptions.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptions.java @@ -22,11 +22,12 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import akka.actor.ActorRef; +import akka.japi.Pair; /** * Consistence-maintenance part of all subscriptions. @@ -35,38 +36,30 @@ * @param type of approximations of subscriptions for distributed update. */ @NotThreadSafe -public abstract class AbstractSubscriptions> implements Subscriptions { +public abstract class AbstractSubscriptions> implements Subscriptions { /** * Map from local subscribers to topics they subscribe to. */ - protected final Map> subscriberToTopic; - - /** - * Map from local subscribers to their topic filters. - */ - protected final Map>> subscriberToFilter; + protected final Map subscriberDataMap; /** * Map from topic to subscriber count and pre-computed hashes. */ - protected final Map> topicToData; + protected final Map> topicDataMap; /** * Construct subscriptions using the given maps. * Consistency between the maps is not checked. * - * @param subscriberToTopic map from subscribers to topics. - * @param subscriberToFilter map from subscribers to filters. - * @param topicToData map from topics to their data. + * @param subscriberDataMap map from subscribers to topics. + * @param topicDataMap map from topics to their data. */ protected AbstractSubscriptions( - final Map> subscriberToTopic, - final Map>> subscriberToFilter, - final Map> topicToData) { - this.subscriberToTopic = subscriberToTopic; - this.subscriberToFilter = subscriberToFilter; - this.topicToData = topicToData; + final Map subscriberDataMap, + final Map> topicDataMap) { + this.subscriberDataMap = subscriberDataMap; + this.topicDataMap = topicDataMap; } /** @@ -75,53 +68,30 @@ protected AbstractSubscriptions( * @param topic the topic. * @return the hash codes of the topic. */ - protected abstract S hashTopic(String topic); - - /** - * Callback on each new topic introduced into the subscriptions. - * - * @param newTopic the new topic. - */ - protected abstract void onNewTopic(TopicData newTopic); - - /** - * Callback on each topic removed from the subscriptions as a whole. - * - * @param removedTopic the new topic. - */ - protected abstract void onRemovedTopic(TopicData removedTopic); - - @Override - public Stream streamSubscribers(final String topic) { - final TopicData topicData = topicToData.get(topic); - return topicData != null ? topicData.streamSubscribers() : Stream.empty(); - } + public abstract S hashTopic(String topic); @Override public boolean subscribe(final ActorRef subscriber, final Set topics, - final Predicate> filter) { + final Predicate> filter, + @Nullable final String group) { if (!topics.isEmpty()) { // box the 'changed' flag in an array so that it can be assigned inside a closure. final boolean[] changed = new boolean[1]; - // change filter. - subscriberToFilter.compute(subscriber, (k, previousFilter) -> { - changed[0] = previousFilter != filter; - return filter; + // add topics and filter. + final SubscriberData subscriberData = SubscriberData.of(topics, filter, group); + subscriberDataMap.merge(subscriber, subscriberData, (oldData, newData) -> { + changed[0] = oldData.getFilter() != newData.getFilter(); + return newData.withTopics(unionSet(oldData.getTopics(), newData.getTopics())); }); - // add topics to subscriber - subscriberToTopic.merge(subscriber, topics, AbstractSubscriptions::unionSet); - // add subscriber for each new topic; detect whether there is any change. for (final String topic : topics) { - topicToData.compute(topic, (k, previousData) -> { + topicDataMap.compute(topic, (k, previousData) -> { if (previousData == null) { changed[0] = true; - final TopicData newTopic = TopicData.firstSubscriber(subscriber, hashTopic(topic)); - onNewTopic(newTopic); - return newTopic; + return TopicData.firstSubscriber(subscriber, hashTopic(topic)); } else { // no short-circuit evaluation for OR: subscriber should always be added. changed[0] |= previousData.addSubscriber(subscriber); @@ -132,12 +102,7 @@ public boolean subscribe(final ActorRef subscriber, return changed[0]; } else { // update filter if there are any existing topic subscribed - if (subscriberToTopic.containsKey(subscriber)) { - subscriberToFilter.put(subscriber, filter); - return true; - } else { - return false; - } + return null != subscriberDataMap.computeIfPresent(subscriber, (k, data) -> data.withFilter(filter)); } } @@ -145,7 +110,8 @@ public boolean subscribe(final ActorRef subscriber, public boolean unsubscribe(final ActorRef subscriber, final Set topics) { // box 'changed' flag for assignment inside closure final boolean[] changed = new boolean[1]; - subscriberToTopic.computeIfPresent(subscriber, (k, previousTopics) -> { + subscriberDataMap.computeIfPresent(subscriber, (k, subscriberData) -> { + final Set previousTopics = subscriberData.getTopics(); final List removed = new ArrayList<>(); final Set remaining = new HashSet<>(); for (final String topic : previousTopics) { @@ -159,10 +125,9 @@ public boolean unsubscribe(final ActorRef subscriber, final Set topics) removeSubscriberForTopics(subscriber, removed); if (remaining.isEmpty()) { // subscriber is removed - subscriberToFilter.remove(subscriber); return null; } else { - return remaining; + return subscriberData.withTopics(remaining); } }); return changed[0]; @@ -172,22 +137,21 @@ public boolean unsubscribe(final ActorRef subscriber, final Set topics) public boolean removeSubscriber(final ActorRef subscriber) { // box 'changed' flag in array for assignment inside closure final boolean[] changed = new boolean[1]; - subscriberToTopic.computeIfPresent(subscriber, (k, topics) -> { - changed[0] = removeSubscriberForTopics(subscriber, topics); + subscriberDataMap.computeIfPresent(subscriber, (k, data) -> { + changed[0] = removeSubscriberForTopics(subscriber, data.getTopics()); return null; }); - subscriberToFilter.remove(subscriber); return changed[0]; } @Override public boolean contains(final ActorRef subscriber) { - return subscriberToTopic.containsKey(subscriber); + return subscriberDataMap.containsKey(subscriber); } @Override public int countTopics() { - return topicToData.size(); + return topicDataMap.size(); } @Override @@ -196,11 +160,14 @@ public SubscriptionsReader snapshot() { } private Map>> exportSubscriberToFilter() { - return Map.copyOf(subscriberToFilter); + return subscriberDataMap.entrySet() + .stream() + .map(entry -> Pair.create(entry.getKey(), entry.getValue().getFilter())) + .collect(Collectors.toMap(Pair::first, Pair::second)); } private Map> exportTopicData() { - return Collections.unmodifiableMap(topicToData.entrySet() + return Collections.unmodifiableMap(topicDataMap.entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().exportSubscribers()))); } @@ -209,10 +176,9 @@ private boolean removeSubscriberForTopics(final ActorRef subscriber, final Colle // box 'changed' flag for assignment inside closure final boolean[] changed = new boolean[1]; for (final String topic : topics) { - topicToData.computeIfPresent(topic, (k, data) -> { + topicDataMap.computeIfPresent(topic, (k, data) -> { changed[0] |= data.removeSubscriber(subscriber); if (data.isEmpty()) { - onRemovedTopic(data); return null; } else { return data; @@ -231,10 +197,9 @@ private static Set unionSet(final Set s1, final Set s2) @Override public boolean equals(final Object other) { if (other instanceof AbstractSubscriptions) { - final AbstractSubscriptions that = (AbstractSubscriptions) other; - return subscriberToTopic.equals(that.subscriberToTopic) && - subscriberToFilter.equals(that.subscriberToFilter) && - topicToData.equals(that.topicToData); + final AbstractSubscriptions that = (AbstractSubscriptions) other; + return subscriberDataMap.equals(that.subscriberDataMap) && + topicDataMap.equals(that.topicDataMap); } else { return false; } @@ -242,7 +207,7 @@ public boolean equals(final Object other) { @Override public int hashCode() { - return Objects.hash(subscriberToTopic, subscriberToFilter, topicToData); + return Objects.hash(subscriberDataMap, topicDataMap); } } diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptionsUpdate.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptionsUpdate.java index 84c08f97f2..1951dd0397 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptionsUpdate.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptionsUpdate.java @@ -26,20 +26,14 @@ public abstract class AbstractSubscriptionsUpdate { private Set inserts; - private Set deletes; - private boolean replaceAll; /** * Create an IndelUpdate. * * @param inserts what to insert. - * @param deletes what to delete. - * @param replaceAll whether it is a replacement update. */ - protected AbstractSubscriptionsUpdate(final Set inserts, final Set deletes, final boolean replaceAll) { + protected AbstractSubscriptionsUpdate(final Set inserts) { this.inserts = inserts; - this.deletes = deletes; - this.replaceAll = replaceAll; } // TODO: javadoc @@ -51,30 +45,8 @@ public Set getInserts() { return inserts; } - @Override - public Set getDeletes() { - return deletes; - } - - @Override - public boolean shouldReplaceAll() { - return replaceAll; - } - protected void reset() { inserts = new HashSet<>(); - deletes = new HashSet<>(); - replaceAll = false; - } - - public void insert(final S newInserts) { - inserts.add(newInserts); - deletes.remove(newInserts); - } - - public void delete(final S newDeletes) { - inserts.remove(newDeletes); - deletes.add(newDeletes); } public T exportAndReset() { @@ -87,7 +59,7 @@ public T exportAndReset() { public boolean equals(final Object other) { if (getClass().isInstance(other)) { final AbstractSubscriptionsUpdate that = getClass().cast(other); - return replaceAll == that.replaceAll && inserts.equals(that.inserts) && deletes.equals(that.deletes); + return Objects.equals(inserts, that.inserts); } else { return false; } @@ -95,15 +67,13 @@ public boolean equals(final Object other) { @Override public int hashCode() { - return Objects.hash(inserts, deletes, replaceAll); + return Objects.hash(inserts); } @Override public String toString() { return getClass().getSimpleName() + " [" + "inserts=" + inserts + - ", deletes=" + deletes + - ", replaceAll=" + replaceAll + "]"; } } diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/DDataReader.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/DDataReader.java index 4e19a76f51..75fa288527 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/DDataReader.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/DDataReader.java @@ -12,7 +12,6 @@ */ package org.eclipse.ditto.services.utils.pubsub.ddata; -import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletionStage; @@ -29,23 +28,6 @@ */ public interface DDataReader { - /** - * Get subscribers from a list of topic hashes. - * - * @param multimap the state of the distributed data. - * @param topicHashes the hash codes of each topic. - * @return collection of subscribers with 1 or more topics. - */ - Collection getSubscribers(Map> multimap, Collection topicHashes); - - /** - * Get subscribers from a list of topic hashes. - * - * @param topicHashes the hash codes of each topic. - * @return future collection of subscribers whose Bloom filter contains all hashes of 1 or more topics. - */ - CompletionStage> getSubscribers(Collection topicHashes); - /** * Read a low-level map from the local replicator. * @@ -68,7 +50,7 @@ default CompletionStage>> read() { * @param topic the topic. * @return its approximation in the distributed data. */ - T approximate(String topic); + long approximate(String topic); /** * Start sending distributed data change events to the recipient. diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/DDataUpdate.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/DDataUpdate.java index 9f265dffdc..66faad593a 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/DDataUpdate.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/DDataUpdate.java @@ -28,14 +28,4 @@ public interface DDataUpdate { * @return Inserted elements. */ Set getInserts(); - - /** - * @return Deleted elements. - */ - Set getDeletes(); - - /** - * @return Whether the distributed data should clear all associations and replace them by inserts. - */ - boolean shouldReplaceAll(); } diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/Hashes.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/Hashes.java index a1993acf75..a6a21e07e5 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/Hashes.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/Hashes.java @@ -51,6 +51,17 @@ default List getHashes(final String string) { return getSeeds().stream().map(seed -> murmurHash(string, seed)).collect(Collectors.toList()); } + /** + * Hash a string topic into a long integer. + * + * @param topic the topic. + * @return the hashed topic. + */ + default Long hashAsLong(final String topic) { + final List hashes = getHashes(topic); + return ((long) hashes.get(0)) << 32 | hashes.get(1) & 0xffffffffL; + } + /** * Hash a string by a seeded Murmur-3 hash function. * diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/SubscriberData.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/SubscriberData.java new file mode 100644 index 0000000000..5c21bf2212 --- /dev/null +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/SubscriberData.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.services.utils.pubsub.ddata; + +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * Properties of a subscriber. + */ +@NotThreadSafe +public final class SubscriberData { + + private final Set topics; + private final Predicate> filter; + @Nullable private final String group; + + private SubscriberData(final Set topics, final Predicate> filter, + @Nullable final String group) { + this.topics = topics; + this.filter = filter; + this.group = group; + } + + /** + * Create subscriber data. + * + * @param topics topics the subscriber subscribes to. + * @param filter topic filter of the subscriber. + * @param group the group the subscriber belongs to, if any. + * @return the subscriber data. + */ + public static SubscriberData of(final Set topics, final Predicate> filter, + @Nullable final String group) { + return new SubscriberData(topics, filter, group); + } + + /** + * Create a copy of this object with topics replaced. + * + * @param topics the new topics. + * @return the new subscriber data. + */ + public SubscriberData withTopics(final Set topics) { + return new SubscriberData(topics, filter, group); + } + + /** + * Create a copy of this object with filter replaced. + * + * @param filter the new filter. + * @return the new subscriber data. + */ + public SubscriberData withFilter(final Predicate> filter) { + return new SubscriberData(topics, filter, group); + } + + /** + * @return topics the subscriber subscribes to. + */ + public Set getTopics() { + return topics; + } + + /** + * @return the filter of the subscriber. + */ + public Predicate> getFilter() { + return filter; + } + + /** + * @return the group the subscriber belongs to, or an empty optional. + */ + public Optional getGroup() { + return Optional.ofNullable(group); + } + + @Override + public boolean equals(final Object other) { + if (other instanceof SubscriberData) { + final SubscriberData that = (SubscriberData) other; + return Objects.equals(topics, that.topics) && + Objects.equals(filter, that.filter) && + Objects.equals(group, that.group); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(topics, filter, group); + } +} diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/Subscriptions.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/Subscriptions.java index bd95d2779a..168a1c7fc5 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/Subscriptions.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/Subscriptions.java @@ -15,7 +15,8 @@ import java.util.Collection; import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Stream; + +import javax.annotation.Nullable; import akka.actor.ActorRef; @@ -41,33 +42,28 @@ public interface Subscriptions { */ boolean contains(ActorRef subscriber); - /** - * Check if a topic has any subscriber. - * - * @param topic the topic. - * @return whether some actor subscribes for the topic. - */ - Stream streamSubscribers(String topic); - /** * Subscribe for filtered messages published at any of the given topics. * * @param subscriber the subscriber. * @param topics topics the subscriber subscribes to. * @param filter filter for topics of incoming messages associated with the subscriber. + * @param group any group the subscriber belongs to, or null. * @return whether subscriptions changed. */ - boolean subscribe(ActorRef subscriber, Set topics, Predicate> filter); + boolean subscribe(ActorRef subscriber, Set topics, Predicate> filter, + @Nullable final String group); /** * Subscribe for all messages published at any of the given topics. * * @param subscriber the subscriber. * @param topics topics the subscriber subscribes to. + * @param group any group the subscriber belongs to, or null. * @return whether subscriptions changed. */ - default boolean subscribe(final ActorRef subscriber, final Set topics) { - return subscribe(subscriber, topics, ts -> true); + default boolean subscribe(final ActorRef subscriber, final Set topics, @Nullable final String group) { + return subscribe(subscriber, topics, ts -> true, group); } /** @@ -96,12 +92,10 @@ default boolean subscribe(final ActorRef subscriber, final Set topics) { /** * Export approximation of subscription data to be broadcast into the cluster. - * Clear the cache of accumulated changes if incremental update is supported. * - * @param forceUpdate whether this is a force update. Relevant for distributed data with incremental update only. * @return Approximation of all topics with subscribers for distributed data. */ - T export(boolean forceUpdate); + T export(); /** * @return whether there are no subscribers. diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedDData.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedDData.java index 9bc830c504..3b58d3c06b 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedDData.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedDData.java @@ -18,6 +18,7 @@ import org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader; import org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter; import org.eclipse.ditto.services.utils.pubsub.ddata.Subscriptions; +import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralUpdate; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -26,7 +27,7 @@ /** * Access to distributed data of compressed topics. */ -public final class CompressedDData implements DData { +public final class CompressedDData implements DData { private final CompressedDDataHandler handler; @@ -57,17 +58,17 @@ public static CompressedDData of(final CompressedDDataHandler extension) { } @Override - public DDataReader getReader() { + public DDataReader getReader() { return handler; } @Override - public DDataWriter getWriter() { + public DDataWriter getWriter() { return handler; } @Override - public Subscriptions createSubscriptions() { + public Subscriptions createSubscriptions() { return CompressedSubscriptions.of(handler.getSeeds()); } @@ -75,7 +76,7 @@ public Subscriptions createSubscriptions() { * Abstract class of distributed data extension provider to be instantiated at user site. */ public abstract static class Provider - extends DistributedData.AbstractDDataProvider, CompressedDDataHandler> { + extends DistributedData.AbstractDDataProvider, CompressedDDataHandler> { /** * Get the ddata extension's config from an actor system. diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedDDataHandler.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedDDataHandler.java index cbbec86b0c..c10ffb14f6 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedDDataHandler.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedDDataHandler.java @@ -20,6 +20,7 @@ import org.eclipse.ditto.services.utils.pubsub.config.PubSubConfig; import org.eclipse.ditto.services.utils.pubsub.ddata.AbstractDDataHandler; import org.eclipse.ditto.services.utils.pubsub.ddata.Hashes; +import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralUpdate; import akka.actor.ActorRef; import akka.actor.ActorRefFactory; @@ -32,7 +33,7 @@ * A distributed collection of hashes of strings indexed by ActorRef. * The hash functions for all filter should be identical. */ -public final class CompressedDDataHandler extends AbstractDDataHandler +public final class CompressedDDataHandler extends AbstractDDataHandler implements Hashes { private final List seeds; @@ -78,15 +79,15 @@ public List getSeeds() { * @return the compressed topic. */ @Override - public Long approximate(final String topic) { - return hashCodesToLong(getHashes(topic)); + public long approximate(final String topic) { + return hashAsLong(topic); } @Override public CompletionStage removeAddress(final Address address, final Replicator.WriteConsistency writeConsistency) { return update(writeConsistency, mmap -> { - ORMultiMap result = mmap; + ORMultiMap result = mmap; for (final ActorRef subscriber : mmap.getEntries().keySet()) { if (subscriber.path().address().equals(address)) { result = result.remove(selfUniqueAddress, subscriber); @@ -96,7 +97,4 @@ public CompletionStage removeAddress(final Address address, }); } - static long hashCodesToLong(final List hashes) { - return ((long) hashes.get(0)) << 32 | hashes.get(1) & 0xffffffffL; - } } diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedSubscriptions.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedSubscriptions.java index 5afe520b6d..1ff66feac1 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedSubscriptions.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedSubscriptions.java @@ -14,16 +14,20 @@ import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.function.Predicate; +import java.util.stream.Collectors; import javax.annotation.concurrent.NotThreadSafe; import org.eclipse.ditto.services.utils.pubsub.ddata.AbstractSubscriptions; import org.eclipse.ditto.services.utils.pubsub.ddata.Hashes; +import org.eclipse.ditto.services.utils.pubsub.ddata.SubscriberData; import org.eclipse.ditto.services.utils.pubsub.ddata.TopicData; +import org.eclipse.ditto.services.utils.pubsub.ddata.ack.Grouped; +import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralUpdate; import akka.actor.ActorRef; @@ -31,27 +35,20 @@ * Local subscriptions for distribution of subscribed topics as hash code sequences. */ @NotThreadSafe -public final class CompressedSubscriptions extends AbstractSubscriptions +public final class CompressedSubscriptions extends AbstractSubscriptions implements Hashes { /** * Seeds of hash functions. They should be identical cluster-wide. */ private final Collection seeds; - private final Map hashCodeToTopicCount; - private final CompressedUpdate updates; private CompressedSubscriptions( final Collection seeds, - final Map> subscriberToTopic, - final Map>> subscriberToFilter, - final Map> topicToData, - final Map hashCodeToTopicCount, - final CompressedUpdate updates) { - super(subscriberToTopic, subscriberToFilter, topicToData); + final Map subscriberDataMap, + final Map> topicToData) { + super(subscriberDataMap, topicToData); this.seeds = seeds; - this.hashCodeToTopicCount = hashCodeToTopicCount; - this.updates = updates; } /** @@ -61,37 +58,12 @@ private CompressedSubscriptions( * @return the compressed subscriptions object. */ public static CompressedSubscriptions of(final Collection seeds) { - return new CompressedSubscriptions(seeds, new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), - CompressedUpdate.empty()); + return new CompressedSubscriptions(seeds, new HashMap<>(), new HashMap<>()); } @Override - protected Long hashTopic(final String topic) { - return CompressedDDataHandler.hashCodesToLong(getHashes(topic)); - } - - @Override - protected void onNewTopic(final TopicData newTopic) { - hashCodeToTopicCount.compute(newTopic.getHashes(), (hashes, count) -> { - if (count == null) { - updates.insert(hashes); - return 1; - } else { - return count + 1; - } - }); - } - - @Override - protected void onRemovedTopic(final TopicData removedTopic) { - hashCodeToTopicCount.computeIfPresent(removedTopic.getHashes(), (hashes, count) -> { - if (count > 1) { - return count - 1; - } else { - updates.delete(hashes); - return null; - } - }); + public Long hashTopic(final String topic) { + return hashAsLong(topic); } @Override @@ -100,22 +72,24 @@ public Collection getSeeds() { } @Override - public CompressedUpdate export(final boolean forceUpdate) { - if (forceUpdate) { - return CompressedUpdate.replaceAll(hashCodeToTopicCount.keySet()); - } else { - return updates.exportAndReset(); - } + public LiteralUpdate export() { + final Set serializedGroupedTopics = new HashSet<>(); + subscriberDataMap.forEach((subscriber, data) -> { + final Set topicHashes = data.getTopics() + .stream() + .map(this::hashAsLong) + .collect(Collectors.toSet()); + final Grouped groupedHashes = Grouped.of(data.getGroup().orElse(null), topicHashes); + serializedGroupedTopics.add(groupedHashes.toJsonString()); + }); + return LiteralUpdate.replaceAll(serializedGroupedTopics); } @Override public boolean equals(final Object other) { if (other instanceof CompressedSubscriptions) { final CompressedSubscriptions that = (CompressedSubscriptions) other; - return seeds.equals(that.seeds) && - hashCodeToTopicCount.equals(that.hashCodeToTopicCount) && - updates.equals(that.updates) && - super.equals(other); + return seeds.equals(that.seeds) && super.equals(other); } else { return false; } @@ -123,6 +97,7 @@ public boolean equals(final Object other) { @Override public int hashCode() { - return Objects.hash(seeds, hashCodeToTopicCount, updates, super.hashCode()); + return Objects.hash(seeds, super.hashCode()); } + } diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedUpdate.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedUpdate.java deleted file mode 100644 index 0cf2035110..0000000000 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedUpdate.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2019 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.ditto.services.utils.pubsub.ddata.compressed; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import javax.annotation.concurrent.NotThreadSafe; - -import org.eclipse.ditto.services.utils.pubsub.ddata.AbstractSubscriptionsUpdate; - -/** - * Updates of compressed DData. - */ -@NotThreadSafe -public final class CompressedUpdate extends AbstractSubscriptionsUpdate { - - private CompressedUpdate(final Set inserts, final Set deletes, final boolean replaceAll) { - super(inserts, deletes, replaceAll); - } - - /** - * @return An empty update. - */ - public static CompressedUpdate empty() { - return new CompressedUpdate(new HashSet<>(), new HashSet<>(), false); - } - - /** - * Replace everything associated with a subscriber in the distributed data. - * - * @param inserts topics to insert. - * @return an immutable update object. - */ - public static CompressedUpdate replaceAll(final Set inserts) { - final Set copyOfInserts = Set.copyOf(inserts); - return new CompressedUpdate(copyOfInserts, Collections.emptySet(), true); - } - - @Override - public CompressedUpdate snapshot() { - return new CompressedUpdate(Set.copyOf(getInserts()), Set.copyOf(getDeletes()), shouldReplaceAll()); - } -} diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralDDataHandler.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralDDataHandler.java index ee356e53f5..47d6011ce9 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralDDataHandler.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralDDataHandler.java @@ -53,8 +53,8 @@ public static LiteralDDataHandler create(final ActorSystem system, } @Override - public String approximate(final String topic) { - return topic; + public long approximate(final String topic) { + return topic.hashCode(); } @Override diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralSubscriptions.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralSubscriptions.java index d63272c8ba..7159b96ca7 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralSubscriptions.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralSubscriptions.java @@ -12,16 +12,14 @@ */ package org.eclipse.ditto.services.utils.pubsub.ddata.literal; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.function.Predicate; import javax.annotation.concurrent.NotThreadSafe; import org.eclipse.ditto.services.utils.pubsub.ddata.AbstractSubscriptions; +import org.eclipse.ditto.services.utils.pubsub.ddata.SubscriberData; import org.eclipse.ditto.services.utils.pubsub.ddata.TopicData; import akka.actor.ActorRef; @@ -30,16 +28,15 @@ * Local subscriptions for distribution of subscribed topics as hash code sequences. */ @NotThreadSafe -public final class LiteralSubscriptions extends AbstractSubscriptions { +public final class LiteralSubscriptions extends AbstractSubscriptions { private final LiteralUpdate updates; private LiteralSubscriptions( - final Map> subscriberToTopic, - final Map>> subscriberToFilter, + final Map subscriberDataMap, final Map> topicToData, final LiteralUpdate updates) { - super(subscriberToTopic, subscriberToFilter, topicToData); + super(subscriberDataMap, topicToData); this.updates = updates; } @@ -49,31 +46,17 @@ private LiteralSubscriptions( * @return the subscriptions object. */ public static LiteralSubscriptions newInstance() { - return new LiteralSubscriptions(new HashMap<>(), new HashMap<>(), new HashMap<>(), LiteralUpdate.empty()); + return new LiteralSubscriptions(new HashMap<>(), new HashMap<>(), LiteralUpdate.empty()); } @Override - protected String hashTopic(final String topic) { + public String hashTopic(final String topic) { return topic; } @Override - protected void onNewTopic(final TopicData newTopic) { - // nothing to do - } - - @Override - protected void onRemovedTopic(final TopicData removedTopic) { - // nothing to do - } - - @Override - public LiteralUpdate export(final boolean forceUpdate) { - if (forceUpdate) { - return LiteralUpdate.replaceAll(topicToData.keySet()); - } else { - return updates.exportAndReset(); - } + public LiteralUpdate export() { + return LiteralUpdate.replaceAll(topicDataMap.keySet()); } @Override diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralUpdate.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralUpdate.java index 9bbec106dd..77ef6f7954 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralUpdate.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/literal/LiteralUpdate.java @@ -13,29 +13,30 @@ package org.eclipse.ditto.services.utils.pubsub.ddata.literal; -import java.util.Collections; import java.util.HashSet; import java.util.Set; import javax.annotation.concurrent.NotThreadSafe; import org.eclipse.ditto.services.utils.pubsub.ddata.AbstractSubscriptionsUpdate; +import org.eclipse.ditto.services.utils.pubsub.ddata.DDataUpdate; /** * Updates of uncompressed DData. */ @NotThreadSafe -public final class LiteralUpdate extends AbstractSubscriptionsUpdate { +public final class LiteralUpdate extends AbstractSubscriptionsUpdate + implements DDataUpdate { - private LiteralUpdate(final Set inserts, final Set deletes, final boolean replaceAll) { - super(inserts, deletes, replaceAll); + private LiteralUpdate(final Set inserts) { + super(inserts); } /** * @return An empty update. */ public static LiteralUpdate empty() { - return new LiteralUpdate(new HashSet<>(), new HashSet<>(), false); + return new LiteralUpdate(new HashSet<>()); } /** @@ -46,11 +47,11 @@ public static LiteralUpdate empty() { */ public static LiteralUpdate replaceAll(final Set inserts) { final Set copyOfInserts = Set.copyOf(inserts); - return new LiteralUpdate(copyOfInserts, Collections.emptySet(), true); + return new LiteralUpdate(copyOfInserts); } @Override public LiteralUpdate snapshot() { - return new LiteralUpdate(Set.copyOf(getInserts()), Set.copyOf(getDeletes()), shouldReplaceAll()); + return new LiteralUpdate(Set.copyOf(getInserts())); } } diff --git a/services/utils/pubsub/src/main/resources/reference.conf b/services/utils/pubsub/src/main/resources/reference.conf index 682069fcd8..683962e55d 100644 --- a/services/utils/pubsub/src/main/resources/reference.conf +++ b/services/utils/pubsub/src/main/resources/reference.conf @@ -8,12 +8,6 @@ ditto { update-interval = 3s update-interval = ${?DITTO_PUBSUB_UPDATE_INTERVAL} - // About once every 500s, push own subscriptions into the cluster no matter what, - // to recover from disassociation or delta update failure. - // Setting it to 1 disables delta update. - force-update-probability = 0.01 - force-update-probability = ${?DITTO_PUBSUB_FORCE_UPDATE_PROBABILITY} - // seed of hash functions; must be identical across the cluster for pub-sub to work. // rotate when paranoid about collision attacks. seed = """Two households, both alike in dignity, diff --git a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/PubSubFactoryTest.java b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/PubSubFactoryTest.java index e31499a991..38f64aa50f 100644 --- a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/PubSubFactoryTest.java +++ b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/PubSubFactoryTest.java @@ -35,7 +35,6 @@ import org.eclipse.ditto.model.base.acks.AcknowledgementRequest; import org.eclipse.ditto.model.base.headers.DittoHeaders; import org.eclipse.ditto.model.things.ThingId; -import org.eclipse.ditto.services.utils.pubsub.actors.SubscriptionsChanged; import org.eclipse.ditto.services.utils.pubsub.api.LocalAcksChanged; import org.eclipse.ditto.services.utils.pubsub.api.SubAck; import org.eclipse.ditto.services.utils.pubsub.api.Subscribe; @@ -212,7 +211,7 @@ public void watchForLocalActorTermination() { // THEN: the subscriber is removed Awaitility.await().untilAsserted(() -> - assertThat(factory1.getSubscribers("hello").toCompletableFuture().join()) + assertThat(factory1.getSubscribers().toCompletableFuture().join()) .describedAs("subscriber should be removed from ddata after termination") .isEmpty() ); @@ -242,7 +241,7 @@ public void removeSubscriberOfRemovedClusterMember() { // THEN: the subscriber is removed Awaitility.await().untilAsserted(() -> - assertThat(factory1.getSubscribers("hello").toCompletableFuture().join()) + assertThat(factory1.getSubscribers().toCompletableFuture().join()) .describedAs("subscriber should be removed from ddata") .isEmpty()); }}; diff --git a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/TestPubSubFactory.java b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/TestPubSubFactory.java index d53a067792..86d51f06e4 100644 --- a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/TestPubSubFactory.java +++ b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/TestPubSubFactory.java @@ -13,13 +13,12 @@ package org.eclipse.ditto.services.utils.pubsub; import java.util.Collection; -import java.util.Collections; +import java.util.Map; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.eclipse.ditto.services.utils.pubsub.config.PubSubConfig; -import org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader; import org.eclipse.ditto.services.utils.pubsub.ddata.Hashes; import org.eclipse.ditto.services.utils.pubsub.extractors.AckExtractor; import org.eclipse.ditto.services.utils.pubsub.extractors.PubSubTopicExtractor; @@ -59,20 +58,8 @@ static TestPubSubFactory of(final ActorContext context, final AckExtractor> getSubscribers(final String topic) { - return getSubscribers(Collections.singleton(topic), ddata.getReader()); - } - - /** - * Retrieve subscribers of a collection of topics from the distributed data. - * Useful for circumventing lackluster existential type implementation when the reader type parameter isn't known. - * - * @param topics the topics. - * @return subscribers of those topics in the distributed data. - */ - private static CompletionStage> getSubscribers( - final Collection topics, final DDataReader reader) { - return reader.getSubscribers(topics.stream().map(reader::approximate).collect(Collectors.toSet())); + CompletionStage> getSubscribers() { + return ddata.getReader().read().thenApply(Map::keySet); } @Override diff --git a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/config/DefaultPubSubConfigTest.java b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/config/DefaultPubSubConfigTest.java index 1c67862155..3b05412511 100644 --- a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/config/DefaultPubSubConfigTest.java +++ b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/config/DefaultPubSubConfigTest.java @@ -18,7 +18,6 @@ import java.time.Duration; import org.assertj.core.api.JUnitSoftAssertions; -import org.assertj.core.data.Percentage; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -68,10 +67,6 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() { softly.assertThat(underTest.getUpdateInterval()) .as(PubSubConfig.ConfigValue.UPDATE_INTERVAL.getConfigPath()) .isEqualTo(Duration.ofSeconds(3L)); - - softly.assertThat(underTest.getForceUpdateProbability()) - .as(PubSubConfig.ConfigValue.FORCE_UPDATE_PROBABILITY.getConfigPath()) - .isCloseTo(0.01, Percentage.withPercentage(1.0)); } @Test @@ -89,10 +84,6 @@ public void underTestReturnsValuesOfConfigFile() { softly.assertThat(underTest.getUpdateInterval()) .as(PubSubConfig.ConfigValue.UPDATE_INTERVAL.getConfigPath()) .isEqualTo(Duration.ofSeconds(4L)); - - softly.assertThat(underTest.getForceUpdateProbability()) - .as(PubSubConfig.ConfigValue.FORCE_UPDATE_PROBABILITY.getConfigPath()) - .isCloseTo(0.011, Percentage.withPercentage(1.0)); } } diff --git a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptionsTest.java b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptionsTest.java index bd48ec2469..7addbc88ff 100644 --- a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptionsTest.java +++ b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/ddata/AbstractSubscriptionsTest.java @@ -32,7 +32,7 @@ * @param type of distributed updates. * @param type of subscriptions objects. */ -public abstract class AbstractSubscriptionsTest, S extends AbstractSubscriptions> { +public abstract class AbstractSubscriptionsTest, S extends AbstractSubscriptions> { protected static final ActorRef ACTOR1 = new MockActorRef("actor1"); protected static final ActorRef ACTOR2 = new MockActorRef("actor2"); @@ -49,23 +49,23 @@ public abstract class AbstractSubscriptionsTest, S e */ protected S getVennDiagram() { final S subscriptions = newSubscriptions(); - subscriptions.subscribe(ACTOR1, asSet("1", "2", "4", "5")); - subscriptions.subscribe(ACTOR2, asSet("2", "3", "5", "6")); - subscriptions.subscribe(ACTOR3, asSet("4", "5", "6", "7")); + subscriptions.subscribe(ACTOR1, asSet("1", "2", "4", "5"), null); + subscriptions.subscribe(ACTOR2, asSet("2", "3", "5", "6"), null); + subscriptions.subscribe(ACTOR3, asSet("4", "5", "6", "7"), null); return subscriptions; } @Test public void createEmptySubscriptions() { final int hashFamilySize = 8; - final AbstractSubscriptions underTest = newSubscriptions(); - assertThat(underTest.subscriberToTopic.isEmpty()).isTrue(); - assertThat(underTest.topicToData.isEmpty()).isTrue(); + final AbstractSubscriptions underTest = newSubscriptions(); + assertThat(underTest.subscriberDataMap.isEmpty()).isTrue(); + assertThat(underTest.topicDataMap.isEmpty()).isTrue(); } @Test public void testVennDiagramMembership() { - final AbstractSubscriptions underTest = getVennDiagram(); + final AbstractSubscriptions underTest = getVennDiagram(); final SubscriptionsReader reader = underTest.snapshot(); assertThat(reader.getSubscribers(singleton("1"))).containsExactlyInAnyOrder(ACTOR1); assertThat(reader.getSubscribers(singleton("2"))).containsExactlyInAnyOrder(ACTOR1, ACTOR2); @@ -74,15 +74,15 @@ public void testVennDiagramMembership() { assertThat(reader.getSubscribers(singleton("5"))).containsExactlyInAnyOrder(ACTOR1, ACTOR2, ACTOR3); assertThat(reader.getSubscribers(singleton("6"))).containsExactlyInAnyOrder(ACTOR2, ACTOR3); assertThat(reader.getSubscribers(singleton("7"))).containsExactlyInAnyOrder(ACTOR3); - assertThat(underTest.subscriberToTopic.size()).isEqualTo(3); - assertThat(underTest.topicToData.size()).isEqualTo(7); + assertThat(underTest.subscriberDataMap.size()).isEqualTo(3); + assertThat(underTest.topicDataMap.size()).isEqualTo(7); } @Test public void testVennDiagramWithFilter() { - final AbstractSubscriptions underTest = getVennDiagram(); - underTest.subscribe(ACTOR1, Collections.emptySet(), topics -> topics.contains("1")); - underTest.subscribe(ACTOR2, Collections.emptySet(), topics -> !topics.contains("6")); + final AbstractSubscriptions underTest = getVennDiagram(); + underTest.subscribe(ACTOR1, Collections.emptySet(), topics -> topics.contains("1"), null); + underTest.subscribe(ACTOR2, Collections.emptySet(), topics -> !topics.contains("6"), null); final SubscriptionsReader reader = underTest.snapshot(); assertThat(reader.getSubscribers(singleton("1"))).containsExactlyInAnyOrder(ACTOR1); assertThat(reader.getSubscribers(singleton("2"))).containsExactlyInAnyOrder(ACTOR2); @@ -95,13 +95,13 @@ public void testVennDiagramWithFilter() { @Test public void testVennDiagramMembershipAfterRotation() { - final AbstractSubscriptions underTest = getVennDiagram(); - underTest.subscribe(ACTOR1, singleton("3")); - underTest.subscribe(ACTOR1, singleton("6")); - underTest.subscribe(ACTOR2, singleton("4")); - underTest.subscribe(ACTOR2, singleton("7")); - underTest.subscribe(ACTOR3, singleton("1")); - underTest.subscribe(ACTOR3, singleton("2")); + final AbstractSubscriptions underTest = getVennDiagram(); + underTest.subscribe(ACTOR1, singleton("3"), null); + underTest.subscribe(ACTOR1, singleton("6"), null); + underTest.subscribe(ACTOR2, singleton("4"), null); + underTest.subscribe(ACTOR2, singleton("7"), null); + underTest.subscribe(ACTOR3, singleton("1"), null); + underTest.subscribe(ACTOR3, singleton("2"), null); underTest.unsubscribe(ACTOR1, singleton("1")); underTest.unsubscribe(ACTOR1, singleton("4")); underTest.unsubscribe(ACTOR2, singleton("2")); @@ -120,18 +120,18 @@ public void testVennDiagramMembershipAfterRotation() { @Test public void testVennDiagramMembershipAfterAnotherRotation() { - final AbstractSubscriptions underTest = getVennDiagram(); - underTest.subscribe(ACTOR1, singleton("3")); + final AbstractSubscriptions underTest = getVennDiagram(); + underTest.subscribe(ACTOR1, singleton("3"), null); underTest.unsubscribe(ACTOR1, singleton("1")); underTest.unsubscribe(ACTOR1, singleton("4")); underTest.unsubscribe(ACTOR2, singleton("2")); underTest.unsubscribe(ACTOR2, singleton("3")); - underTest.subscribe(ACTOR1, singleton("6")); - underTest.subscribe(ACTOR2, singleton("4")); - underTest.subscribe(ACTOR2, singleton("7")); - underTest.subscribe(ACTOR3, singleton("1")); + underTest.subscribe(ACTOR1, singleton("6"), null); + underTest.subscribe(ACTOR2, singleton("4"), null); + underTest.subscribe(ACTOR2, singleton("7"), null); + underTest.subscribe(ACTOR3, singleton("1"), null); underTest.unsubscribe(ACTOR3, singleton("6")); - underTest.subscribe(ACTOR3, singleton("2")); + underTest.subscribe(ACTOR3, singleton("2"), null); underTest.unsubscribe(ACTOR3, singleton("7")); final SubscriptionsReader reader = underTest.snapshot(); assertThat(reader.getSubscribers(singleton("1"))).containsExactlyInAnyOrder(ACTOR3); @@ -145,7 +145,7 @@ public void testVennDiagramMembershipAfterAnotherRotation() { @Test public void testSubscriberRemoval() { - final AbstractSubscriptions underTest = getVennDiagram(); + final AbstractSubscriptions underTest = getVennDiagram(); underTest.removeSubscriber(ACTOR1); underTest.removeSubscriber(ACTOR2); final SubscriptionsReader reader = underTest.snapshot(); @@ -156,25 +156,25 @@ public void testSubscriberRemoval() { assertThat(reader.getSubscribers(singleton("5"))).containsExactlyInAnyOrder(ACTOR3); assertThat(reader.getSubscribers(singleton("6"))).containsExactlyInAnyOrder(ACTOR3); assertThat(reader.getSubscribers(singleton("7"))).containsExactlyInAnyOrder(ACTOR3); - assertThat(underTest.subscriberToTopic.size()).isEqualTo(1); - assertThat(underTest.topicToData.size()).isEqualTo(4); + assertThat(underTest.subscriberDataMap.size()).isEqualTo(1); + assertThat(underTest.topicDataMap.size()).isEqualTo(4); } @Test public void testSnapshot() { // GIVEN: A snapshot is taken - final AbstractSubscriptions underTest = getVennDiagram(); + final AbstractSubscriptions underTest = getVennDiagram(); final SubscriptionsReader snapshot = underTest.snapshot(); // THEN: snapshot cannot be modified but can be queried assertThat(snapshot.getSubscribers(Arrays.asList("1", "2", "3"))).containsExactlyInAnyOrder(ACTOR1, ACTOR2); // WHEN: the original mutates - underTest.subscribe(ACTOR1, singleton("8")); + underTest.subscribe(ACTOR1, singleton("8"), null); underTest.unsubscribe(ACTOR1, singleton("1")); underTest.removeSubscriber(ACTOR2); - assertThat(underTest.subscriberToTopic.size()).isEqualTo(2); - assertThat(underTest.topicToData.size()).isEqualTo(6); + assertThat(underTest.subscriberDataMap.size()).isEqualTo(2); + assertThat(underTest.topicDataMap.size()).isEqualTo(6); // THEN: snapshot should not change assertThat(snapshot).isEqualTo(getVennDiagram().snapshot()); @@ -183,10 +183,10 @@ public void testSnapshot() { @Test public void changeDetectionIsAccurate() { - final AbstractSubscriptions underTest = getVennDiagram(); + final AbstractSubscriptions underTest = getVennDiagram(); - assertThat(underTest.subscribe(ACTOR1, asSet("1", "2"))).isFalse(); - assertThat(underTest.subscribe(ACTOR1, asSet("2", "3"))).isTrue(); + assertThat(underTest.subscribe(ACTOR1, asSet("1", "2"), null)).isFalse(); + assertThat(underTest.subscribe(ACTOR1, asSet("2", "3"), null)).isTrue(); assertThat(underTest.unsubscribe(ACTOR2, asSet("1", "4"))).isFalse(); assertThat(underTest.unsubscribe(ACTOR2, asSet("2", "3", "5", "7"))).isTrue(); assertThat(underTest.removeSubscriber(ACTOR4)).isFalse(); diff --git a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedSubscriptionsTest.java b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedSubscriptionsTest.java deleted file mode 100644 index 6055c324d4..0000000000 --- a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/ddata/compressed/CompressedSubscriptionsTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2019 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.ditto.services.utils.pubsub.ddata.compressed; - -import static java.util.Collections.singleton; -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.Arrays; -import java.util.stream.IntStream; - -import org.eclipse.ditto.services.utils.pubsub.ddata.AbstractSubscriptionsTest; -import org.junit.Test; - -/** - * Tests {@link org.eclipse.ditto.services.utils.pubsub.ddata.compressed.CompressedSubscriptions}. - */ -public final class CompressedSubscriptionsTest - extends AbstractSubscriptionsTest { - - @Override - protected CompressedSubscriptions newSubscriptions() { - return CompressedSubscriptions.of(Arrays.asList(1, 2, 3)); - } - - @Test - public void exportAllSubscriptions() { - final CompressedSubscriptions underTest = getVennDiagram(); - final CompressedUpdate update = underTest.export(true); - assertThat(update.shouldReplaceAll()).isTrue(); - assertThat(update.getDeletes()).isEmpty(); - assertThat(update.getInserts()).containsExactlyInAnyOrder( - IntStream.rangeClosed(1, 7) - .mapToObj(i -> underTest.hashTopic(String.valueOf(i))) - .toArray(Long[]::new) - ); - } - - @Test - public void exportIncrementalUpdates() { - final CompressedSubscriptions underTest = getVennDiagram(); - - // update 1: construction of Venn diagram. - final CompressedUpdate update1 = underTest.export(false); - assertThat(update1.shouldReplaceAll()).isFalse(); - assertThat(update1.getInserts()).containsExactlyInAnyOrder( - IntStream.rangeClosed(1, 7) - .mapToObj(i -> underTest.hashTopic(String.valueOf(i))) - .toArray(Long[]::new) - ); - assertThat(update1.getDeletes()).isEmpty(); - - // update 2: unsubscription of topic. - underTest.unsubscribe(ACTOR1, singleton("1")); - final CompressedUpdate update2 = underTest.export(false); - assertThat(update2.shouldReplaceAll()).isFalse(); - assertThat(update2.getInserts()).isEmpty(); - assertThat(update2.getDeletes()).containsExactlyInAnyOrder(underTest.hashTopic("1")); - - // update 3: removal of subscriber. - underTest.removeSubscriber(ACTOR3); - final CompressedUpdate update3 = underTest.export(false); - assertThat(update3.shouldReplaceAll()).isFalse(); - assertThat(update3.getInserts()).isEmpty(); - assertThat(update3.getDeletes()).containsExactlyInAnyOrder(underTest.hashTopic("7")); - - // update 4: changes that do not affect the ddata - underTest.subscribe(ACTOR2, singleton("4")); - underTest.removeSubscriber(ACTOR1); - final CompressedUpdate update4 = underTest.export(false); - assertThat(update4.shouldReplaceAll()).isFalse(); - assertThat(update4.getInserts()).isEmpty(); - assertThat(update4.getDeletes()).isEmpty(); - } -}