Skip to content

Commit

Permalink
PubSub: choose a subscriber from the pool for signal publishing.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 3, 2022
1 parent d0c4276 commit bd00213
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ public final class Publisher extends AbstractActor {
private final Counter topicCounter = DittoMetrics.counter("pubsub-published-topics");
private final Counter sentMessagesCounter = DittoMetrics.counter("pubsub-sent-messages");
private final Map<Key<?>, PublisherIndex<Long>> publisherIndexes = new HashMap<>();
private final int subscriberPoolSize;

private PublisherIndex<Long> publisherIndex = PublisherIndex.empty();
private RemoteAcksChanged remoteAcks = RemoteAcksChanged.of(Map.of());

@SuppressWarnings("unused")
private Publisher(final DDataReader<ActorRef, String> ddataReader, final DistributedAcks distributedAcks) {
this.ddataReader = ddataReader;
subscriberPoolSize = distributedAcks.getConfig().getSubscriberPoolSize();
ddataReader.receiveChanges(getSelf());
distributedAcks.receiveDistributedDeclaredAcks(getSelf());
}
Expand Down Expand Up @@ -176,10 +178,14 @@ private List<Pair<ActorRef, PublishSignal>> doPublish(final Collection<String> t
subscribers.stream().map(Pair::first).toList());
}
sentMessagesCounter.increment(subscribers.size());
subscribers.forEach(pair -> pair.first().tell(pair.second(), sender));
subscribers.forEach(pair -> publishSignal(pair.first(), pair.second(), sender));
return subscribers;
}

private void publishSignal(final ActorRef subscriber, final PublishSignal signal, final ActorRef sender) {
Subscriber.chooseSubscriber(subscriber, signal, subscriberPoolSize).tell(signal, sender);
}

private void declaredAcksChanged(final RemoteAcksChanged event) {
remoteAcks = event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.internal.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.internal.utils.pubsub.PubSubFactory;
import org.eclipse.ditto.internal.utils.pubsub.api.LocalAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.PublishSignal;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
Expand All @@ -26,6 +27,7 @@
import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
Expand Down Expand Up @@ -74,6 +76,26 @@ public static <T> Props props(final Class<T> messageClass, final PubSubTopicExtr
return Props.create(Subscriber.class, messageClass, topicExtractor, ackExtractor, distributedAcks);
}

/**
* Choose a subscriber from the subscriber pool based on the hash code of the entity ID of the published signal.
*
* @param parentSubscriber The parent subscriber from the distributed data.
* @param signal The signal to be published.
* @param poolSize The size of the subscriber pool.
* @return An actor selection containing the subscriber in the pool responsible for the signal.
*/
static ActorSelection chooseSubscriber(final ActorRef parentSubscriber, final PublishSignal signal,
final int poolSize) {

if (poolSize > 1) {
final int index = PubSubFactory.hashForPubSub(signal.getSignal().getEntityId()) % poolSize;
if (index > 0) {
return ActorSelection.apply(parentSubscriber, String.valueOf(index));
}
}
return ActorSelection.apply(parentSubscriber, "");
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonParsableCommand;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.JsonParsable;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.base.model.signals.commands.AbstractCommand;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
Expand All @@ -27,13 +34,6 @@
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonParsableCommand;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.JsonParsable;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.AbstractCommand;
import org.eclipse.ditto.base.model.signals.commands.Command;

/**
* Command from Publisher to Subscriber to publish a signal to local subscribers.
Expand All @@ -53,10 +53,10 @@ public final class PublishSignal extends AbstractCommand<PublishSignal> {

private static final String TYPE = TYPE_PREFIX + NAME;

private final Signal<?> signal;
private final SignalWithEntityId<?> signal;
private final Map<String, Integer> groups;

private PublishSignal(final Signal<?> signal, final Map<String, Integer> groups) {
private PublishSignal(final SignalWithEntityId<?> signal, final Map<String, Integer> groups) {
super(TYPE, signal.getDittoHeaders(), Category.MODIFY);
this.signal = signal;
this.groups = groups;
Expand All @@ -69,7 +69,7 @@ private PublishSignal(final Signal<?> signal, final Map<String, Integer> groups)
* @param groups relation between the groups where the signal is published to and the size of each group.
* @return the command to do it.
*/
public static PublishSignal of(final Signal<?> signal, final Map<String, Integer> groups) {
public static PublishSignal of(final SignalWithEntityId<?> signal, final Map<String, Integer> groups) {
return new PublishSignal(signal, groups);
}

Expand All @@ -87,8 +87,8 @@ public static PublishSignal fromJson(final JsonObject jsonObject,
final JsonParsable.ParseInnerJson parseInnerJson) {

try {
final Signal<?> signal =
(Signal<?>) parseInnerJson.parseInnerJson(jsonObject.getValueOrThrow(JsonFields.SIGNAL));
final SignalWithEntityId<?> signal = (SignalWithEntityId<?>) parseInnerJson.parseInnerJson(
jsonObject.getValueOrThrow(JsonFields.SIGNAL));
final Map<String, Integer> groups = jsonObject.getValueOrThrow(JsonFields.GROUPS)
.stream()
.collect(Collectors.toMap(JsonField::getKeyName, field -> field.getValue().asInt()));
Expand All @@ -101,7 +101,7 @@ public static PublishSignal fromJson(final JsonObject jsonObject,
/**
* @return the signal to be published.
*/
public Signal<?> getSignal() {
public SignalWithEntityId<?> getSignal() {
return signal;
}

Expand Down Expand Up @@ -155,8 +155,7 @@ public String getResourceType() {

@Override
public boolean equals(final Object other) {
if (other instanceof PublishSignal) {
final PublishSignal that = (PublishSignal) other;
if (other instanceof final PublishSignal that) {
return Objects.equals(signal, that.signal) && Objects.equals(groups, that.groups);
} else {
return false;
Expand Down

0 comments on commit bd00213

Please sign in to comment.