Skip to content

Commit

Permalink
Issue #878: add group to topics ddata; remove incrememental update.
Browse files Browse the repository at this point in the history
Incremental update removed due to losing granuality of topics
in the distributed data.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 20, 2020
1 parent 79e6eb1 commit 12800d6
Show file tree
Hide file tree
Showing 31 changed files with 359 additions and 649 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ final class DistributedSubImpl implements DistributedSub {
@Override
public CompletionStage<SubAck> subscribeWithFilterAndAck(final Collection<String> topics,
final ActorRef subscriber, final Predicate<Collection<String>> filter) {
// TODO: check group is non-empty if non-null
final Subscribe subscribe =
Subscribe.of(new HashSet<>(topics), subscriber, writeAll, true, filter);
Subscribe.of(new HashSet<>(topics), subscriber, writeAll, true, filter, null);
return askSubSupervisor(subscribe);
}

@Override
public CompletionStage<SubAck> subscribeWithAck(final Collection<String> topics,
final ActorRef subscriber) {
return askSubSupervisor(Subscribe.of(new HashSet<>(topics), subscriber, writeAll, true));
return askSubSupervisor(Subscribe.of(new HashSet<>(topics), subscriber, writeAll, true, null));
}

@Override
Expand All @@ -84,7 +85,7 @@ private CompletionStage<SubAck> askSubSupervisor(final Request request) {
public void subscribeWithoutAck(final Collection<String> topics, final ActorRef subscriber) {
final Request request =
Subscribe.of(new HashSet<>(topics), subscriber,
(Replicator.WriteConsistency) Replicator.writeLocal(), false);
(Replicator.WriteConsistency) Replicator.writeLocal(), false, null);
subSupervisor.tell(request, subscriber);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@
*/
abstract class AbstractUpdater<K, T, P> extends AbstractActorWithTimers {

// pseudo-random number generator for force updates. quality matters little.
private final Random random = new Random();

protected final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);

protected final PubSubConfig config;
Expand Down Expand Up @@ -160,13 +157,6 @@ protected void updateFailure(final Status.Failure failure) {
localSubscriptionsChanged = true;
}

/**
* @return whether the next update is a random force update.
*/
protected boolean forceUpdate() {
return random.nextDouble() < config.getForceUpdateProbability();
}

/**
* Add a request to the queue to be handled after cluster update.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ private boolean isAllowedRemotelyBy(final Grouped<String> groupedLabels,
private boolean isAllowedRemotelyBy(@Nullable final String group, final Set<String> ackLabels,
final Map<String, Set<String>> remoteGroups,
final Predicate<String> isTakenRemotely) {
if (group != null) {
final boolean noConflict = noDeclaredLabelMatches(ackLabels, isTakenRemotely);
if (noConflict && group != null) {
final Set<String> remoteGroup = remoteGroups.get(group);
if (remoteGroup != null) {
return remoteGroup.equals(ackLabels);
}
}
return noDeclaredLabelMatches(ackLabels, isTakenRemotely);
return noConflict;
}

private boolean noDeclaredLabelMatches(final Set<String> ackLabels, final Predicate<String> contains) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.AcknowledgementRequest;
import org.eclipse.ditto.model.base.entity.id.EntityIdWithType;
Expand All @@ -28,6 +31,7 @@
import org.eclipse.ditto.services.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.services.utils.pubsub.api.RemoteAcksChanged;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader;
import org.eclipse.ditto.services.utils.pubsub.ddata.ack.Grouped;
import org.eclipse.ditto.services.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;

Expand All @@ -36,35 +40,33 @@
import akka.actor.Address;
import akka.actor.Props;
import akka.cluster.ddata.Replicator;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import scala.collection.immutable.Set;
import scala.jdk.javaapi.CollectionConverters;

/**
* Publishes messages according to topic Bloom filters.
*
* @param <T> representation of topics in the distributed data.
* Publishes messages according to topic distributed data.
*/
public final class Publisher<T> extends AbstractActor {
public final class Publisher extends AbstractActor {

/**
* Prefix of this actor's name.
*/
public static final String ACTOR_NAME_PREFIX = "publisher";
public static final java.lang.String ACTOR_NAME_PREFIX = "publisher";

private final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);

private final DDataReader<ActorRef, T> ddataReader;
private final DDataReader<ActorRef, String> ddataReader;

private final Counter messageCounter = DittoMetrics.counter("pubsub-published-messages");
private final Counter topicCounter = DittoMetrics.counter("pubsub-published-topics");

private Map<ActorRef, Set<T>> topicSubscribers = Map.of();
private Map<Address, java.util.Set<String>> declaredAcks = Map.of();
private java.util.Set<String> allDeclaredAcks = java.util.Set.of();
private Map<ActorRef, Set<Long>> topicSubscribers = Map.of();
private Map<Address, Set<java.lang.String>> declaredAcks = Map.of();
private Set<java.lang.String> allDeclaredAcks = Set.of();

@SuppressWarnings("unused")
private Publisher(final DDataReader<ActorRef, T> ddataReader, final DistributedAcks distributedAcks) {
private Publisher(final DDataReader<ActorRef, String> ddataReader, final DistributedAcks distributedAcks) {
this.ddataReader = ddataReader;
ddataReader.receiveChanges(getSelf());
distributedAcks.receiveDistributedDeclaredAcks(getSelf());
Expand All @@ -90,7 +92,7 @@ public static <T> Props props(final DDataReader<ActorRef, T> ddataReader, final
* @param message the message to publish.
* @return a publish message.
*/
public static Request publish(final Collection<String> topics, final Object message) {
public static Request publish(final Collection<java.lang.String> topics, final Object message) {
return new Publish(topics, message);
}

Expand All @@ -104,9 +106,9 @@ public static Request publish(final Collection<String> topics, final Object mess
* @param dittoHeaders the Ditto headers of any weak acknowledgements to send back.
* @return the request.
*/
public static Request publishWithAck(final Collection<String> topics,
public static Request publishWithAck(final Collection<java.lang.String> topics,
final Object message,
final java.util.Set<AcknowledgementRequest> ackRequests,
final Set<AcknowledgementRequest> ackRequests,
final EntityIdWithType entityId,
final DittoHeaders dittoHeaders) {

Expand All @@ -130,9 +132,9 @@ private void publish(final Publish publish) {

private void publishWithAck(final PublishWithAck publishWithAck) {
final Collection<ActorRef> subscribers = doPublish(publishWithAck.topics, publishWithAck.message);
final java.util.Set<String> subscriberDeclaredAcks = subscribers.stream()
.map(subscriber -> declaredAcks.getOrDefault(subscriber.path().address(), java.util.Set.of()))
.flatMap(java.util.Set::stream)
final Set<java.lang.String> subscriberDeclaredAcks = subscribers.stream()
.map(subscriber -> declaredAcks.getOrDefault(subscriber.path().address(), Set.of()))
.flatMap(Set::stream)
.collect(Collectors.toSet());

final Collection<AcknowledgementLabel> requestedCustomAcks =
Expand All @@ -147,31 +149,50 @@ private void publishWithAck(final PublishWithAck publishWithAck) {
}
}

private Collection<ActorRef> doPublish(final Collection<String> topics, final Object message) {
private Collection<ActorRef> doPublish(final Collection<java.lang.String> topics, final Object message) {
messageCounter.increment();
topicCounter.increment(topics.size());
final List<T> hashes = topics.stream().map(ddataReader::approximate).collect(Collectors.toList());
final List<Long> hashes = topics.stream().map(ddataReader::approximate).collect(Collectors.toList());
final ActorRef sender = getSender();
final Collection<ActorRef> subscribers = ddataReader.getSubscribers(topicSubscribers, hashes);
final Collection<ActorRef> subscribers = getSubscribers(hashes);
subscribers.forEach(subscriber -> subscriber.tell(message, sender));
return subscribers;
}

private void declaredAcksChanged(final RemoteAcksChanged event) {
declaredAcks = event.getMultiMap();
allDeclaredAcks = declaredAcks.values().stream()
.flatMap(java.util.Set::stream)
.flatMap(Set::stream)
.collect(Collectors.toSet());
}

private void topicSubscribersChanged(final Replicator.Changed<?> event) {
topicSubscribers = CollectionConverters.asJava(event.get(ddataReader.getKey()).entries());
final Map<ActorRef, scala.collection.immutable.Set<String>> mmap =
CollectionConverters.asJava(event.get(ddataReader.getKey()).entries());
topicSubscribers = mmap.entrySet()
.stream()
.map(entry -> Pair.create(entry.getKey(), deserializeGroupedHashes(entry.getValue())))
.collect(Collectors.toMap(Pair::first, Pair::second));
}

private void logUnhandled(final Object message) {
log.warning("Unhandled: <{}>", message);
}

private Set<ActorRef> getSubscribers(final Collection<Long> hashes) {
return topicSubscribers.entrySet()
.stream()
.filter(entry -> hashes.stream().anyMatch(entry.getValue()::contains))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

private static Set<Long> deserializeGroupedHashes(final scala.collection.immutable.Set<String> strings) {
return CollectionConverters.asJava(strings).stream()
.flatMap(string -> Grouped.fromJson(JsonObject.of(string), JsonValue::asLong).getValues().stream())
.collect(Collectors.toSet());
}

/**
* Requests to a publisher actor.
*/
Expand All @@ -183,10 +204,10 @@ public interface Request {}
*/
private static final class Publish implements Request {

private final Collection<String> topics;
private final Collection<java.lang.String> topics;
private final Object message;

private Publish(final Collection<String> topics, final Object message) {
private Publish(final Collection<java.lang.String> topics, final Object message) {
this.topics = topics;
this.message = message;
}
Expand All @@ -200,14 +221,14 @@ private static final class PublishWithAck implements Request {
private static final AckExtractor<PublishWithAck> ACK_EXTRACTOR =
AckExtractor.of(p -> p.entityId, p -> p.dittoHeaders);

private final Collection<String> topics;
private final Collection<java.lang.String> topics;
private final Object message;
private final java.util.Set<AcknowledgementRequest> ackRequests;
private final Set<AcknowledgementRequest> ackRequests;
private final EntityIdWithType entityId;
private final DittoHeaders dittoHeaders;

private PublishWithAck(final Collection<String> topics, final Object message,
final java.util.Set<AcknowledgementRequest> ackRequests,
private PublishWithAck(final Collection<java.lang.String> topics, final Object message,
final Set<AcknowledgementRequest> ackRequests,
final EntityIdWithType entityId,
final DittoHeaders dittoHeaders) {
this.topics = topics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static <T> Props props(final PubSubConfig config, final ActorRef subscrib
@Override
protected void subscribe(final Subscribe subscribe) {
final boolean changed =
subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter());
subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter(), null);
enqueueRequest(subscribe, changed, getSender(), awaitUpdate, awaitUpdateMetric);
if (changed) {
getContext().watch(subscribe.getSubscriber());
Expand Down Expand Up @@ -102,7 +102,7 @@ protected void ddataOpSuccess(final DDataOpSuccess<SubscriptionsReader> opSucces

@Override
protected void tick(final Clock tick) {
performDDataOp(forceUpdate(), localSubscriptionsChanged, nextWriteConsistency)
performDDataOp(localSubscriptionsChanged, nextWriteConsistency)
.handle(handleDDataWriteResult(getSeqNr(), nextWriteConsistency));
moveAwaitUpdateToAwaitAcknowledge();
}
Expand All @@ -113,12 +113,11 @@ private void flushSubAcks(final int seqNr) {
}
}

private CompletionStage<SubscriptionsReader> performDDataOp(final boolean forceUpdate,
final boolean localSubscriptionsChanged,
private CompletionStage<SubscriptionsReader> performDDataOp(final boolean localSubscriptionsChanged,
final Replicator.WriteConsistency writeConsistency) {
final SubscriptionsReader snapshot;
final CompletionStage<Void> ddataOp;
if (!localSubscriptionsChanged && !forceUpdate) {
if (!localSubscriptionsChanged) {
snapshot = subscriptions.snapshot();
ddataOp = CompletableFuture.completedStage(null);
} else if (subscriptions.isEmpty()) {
Expand All @@ -127,7 +126,7 @@ private CompletionStage<SubscriptionsReader> performDDataOp(final boolean forceU
topicMetric.set(0L);
} else {
// export before taking snapshot so that implementations may output incremental update.
final T ddata = subscriptions.export(forceUpdate);
final T ddata = subscriptions.export();
// take snapshot to give to the subscriber; clear accumulated incremental changes.
snapshot = subscriptions.snapshot();
ddataOp = topicsWriter.put(subscriber, ddata, writeConsistency);
Expand Down

This file was deleted.

Loading

0 comments on commit 12800d6

Please sign in to comment.