Skip to content

Commit

Permalink
Issue #878: turn DittoProtocolSub into an actor system extension.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 24, 2020
1 parent 542384f commit 6bf7e96
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
final ActorRef proxyActor =
startChildActor(ConnectivityProxyActor.ACTOR_NAME, ConnectivityProxyActor.props(conciergeForwarder));

final DittoProtocolSub dittoProtocolSub =
DittoProtocolSub.of(getContext(), DistributedAcks.create(getContext()));
final DittoProtocolSub dittoProtocolSub = DittoProtocolSub.get(actorSystem);
final Props connectionSupervisorProps =
getConnectionSupervisorProps(dittoProtocolSub, proxyActor, commandValidator, pubSubMediator);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import org.eclipse.ditto.services.utils.health.cluster.ClusterStatus;
import org.eclipse.ditto.services.utils.health.routes.StatusRoute;
import org.eclipse.ditto.services.utils.protocol.ProtocolAdapterProvider;
import org.eclipse.ditto.services.utils.pubsub.DistributedAcks;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -131,8 +130,7 @@ private GatewayRootActor(final GatewayConfig gatewayConfig, final ActorRef pubSu

pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());

final DittoProtocolSub dittoProtocolSub =
DittoProtocolSub.of(getContext(), DistributedAcks.create(getContext()));
final DittoProtocolSub dittoProtocolSub = DittoProtocolSub.get(actorSystem);

final AuthenticationConfig authenticationConfig = gatewayConfig.getAuthenticationConfig();
final DefaultHttpClientFacade httpClient =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.utils.pubsub.DistributedAcks;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Extension;

/**
* Subscriptions for Ditto protocol channels.
*/
public interface DittoProtocolSub {
public interface DittoProtocolSub extends Extension {

/**
* Subscribe for each streaming type the same collection of topics.
Expand Down Expand Up @@ -106,13 +106,13 @@ CompletionStage<Void> declareAcknowledgementLabels(Collection<AcknowledgementLab
void removeAcknowledgementLabelDeclaration(ActorRef subscriber);

/**
* Create {@code DittoProtocolSub} for an actor system.
* Get the {@code DittoProtocolSub} for an actor system.
*
* @param context context of the actor under which the subscriber actors are started.
* @param distributedAcks the distributed acks interface.
* @return the {@code DittoProtocolSub}.
* @param system the actor system.
* @return the {@code DittoProtocolSub} extension.
*/
static DittoProtocolSub of(final ActorContext context, final DistributedAcks distributedAcks) {
return DittoProtocolSubImpl.of(context, distributedAcks);
static DittoProtocolSub get(final ActorSystem system) {
return DittoProtocolSubImpl.ExtensionId.INSTANCE.get(system);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.eclipse.ditto.services.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.services.utils.pubsub.DistributedSub;

import akka.actor.ActorContext;
import akka.actor.AbstractExtensionId;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;

/**
* Default implementation of {@link DittoProtocolSub}.
Expand All @@ -50,11 +52,11 @@ private DittoProtocolSubImpl(final DistributedSub liveSignalSub,
this.distributedAcks = distributedAcks;
}

static DittoProtocolSubImpl of(final ActorContext context, final DistributedAcks distributedAcks) {
static DittoProtocolSubImpl of(final ActorSystem system, final DistributedAcks distributedAcks) {
final DistributedSub liveSignalSub =
LiveSignalPubSubFactory.of(context, distributedAcks).startDistributedSub();
LiveSignalPubSubFactory.of(system, distributedAcks).startDistributedSub();
final DistributedSub twinEventSub =
ThingEventPubSubFactory.readSubjectsOnly(context, distributedAcks).startDistributedSub();
ThingEventPubSubFactory.readSubjectsOnly(system, distributedAcks).startDistributedSub();
return new DittoProtocolSubImpl(liveSignalSub, twinEventSub, distributedAcks);
}

Expand Down Expand Up @@ -154,4 +156,16 @@ private static Predicate<Collection<String>> toFilter(final Collection<Streaming
return topics -> topics.stream().anyMatch(streamingTypeTopics::contains);
}

static final class ExtensionId extends AbstractExtensionId<DittoProtocolSub> {

static final ExtensionId INSTANCE = new ExtensionId();

private ExtensionId() {}

@Override
public DittoProtocolSub createExtension(final ExtendedActorSystem system) {
final DistributedAcks distributedAcks = DistributedAcks.create(system);
return of(system, distributedAcks);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.eclipse.ditto.signals.base.Signal;

import akka.actor.ActorContext;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;

/**
* Pub-sub factory for live signals.
Expand All @@ -37,21 +39,34 @@ final class LiveSignalPubSubFactory extends AbstractPubSubFactory<Signal<?>> {
private static final DDataProvider PROVIDER = DDataProvider.of("live-signal-aware");

@SuppressWarnings("unchecked")
private LiveSignalPubSubFactory(final ActorContext context, final PubSubTopicExtractor<Signal<?>> topicExtractor,
private LiveSignalPubSubFactory(final ActorRefFactory actorRefFactory,
final ActorSystem actorSystem,
final PubSubTopicExtractor<Signal<?>> topicExtractor,
final DistributedAcks distributedAcks) {
super(context, (Class<Signal<?>>) (Object) Signal.class, topicExtractor, PROVIDER, ACK_EXTRACTOR,
distributedAcks);
super(actorRefFactory, actorSystem, (Class<Signal<?>>) (Object) Signal.class, topicExtractor, PROVIDER,
ACK_EXTRACTOR, distributedAcks);
}

/**
* Create a pubsub factory for live signals from an actor system and its shard region extractor.
* Create a pubsub factory for live signals from an actor context.
*
* @param context context of the actor under which the publisher and subscriber actors are started.
* @param distributedAcks the distributed acks interface.
* @return the thing
*/
public static LiveSignalPubSubFactory of(final ActorContext context, final DistributedAcks distributedAcks) {
return new LiveSignalPubSubFactory(context, topicExtractor(), distributedAcks);
return new LiveSignalPubSubFactory(context, context.system(), topicExtractor(), distributedAcks);
}

/**
* Create a pubsub factory for live signals from an actor system.
*
* @param system the actor system.
* @param distributedAcks the distributed acks interface.
* @return the thing
*/
public static LiveSignalPubSubFactory of(final ActorSystem system, final DistributedAcks distributedAcks) {
return new LiveSignalPubSubFactory(system, system, topicExtractor(), distributedAcks);
}

private static Collection<String> getStreamingTypeTopic(final Signal<?> signal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.eclipse.ditto.signals.events.things.ThingEvent;

import akka.actor.ActorContext;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;

/**
* Pub-sub factory for thing events.
Expand All @@ -37,12 +39,13 @@ public final class ThingEventPubSubFactory extends AbstractPubSubFactory<ThingEv
private static final DDataProvider PROVIDER = DDataProvider.of("thing-event-aware");

@SuppressWarnings({"unchecked"})
private ThingEventPubSubFactory(final ActorContext context,
private ThingEventPubSubFactory(final ActorRefFactory actorRefFactory,
final ActorSystem actorSystem,
final PubSubTopicExtractor<ThingEvent<?>> topicExtractor,
final DistributedAcks distributedAcks) {

super(context, (Class<ThingEvent<?>>) (Object) ThingEvent.class, topicExtractor, PROVIDER, ACK_EXTRACTOR,
distributedAcks);
super(actorRefFactory, actorSystem, (Class<ThingEvent<?>>) (Object) ThingEvent.class, topicExtractor, PROVIDER,
ACK_EXTRACTOR, distributedAcks);
}

/**
Expand All @@ -57,19 +60,20 @@ public static ThingEventPubSubFactory of(final ActorContext context,
final ShardRegionExtractor shardRegionExtractor,
final DistributedAcks distributedAcks) {

return new ThingEventPubSubFactory(context, toTopicExtractor(shardRegionExtractor), distributedAcks);
return new ThingEventPubSubFactory(context, context.system(), toTopicExtractor(shardRegionExtractor),
distributedAcks);
}

/**
* Create a pubsub factory for thing events ignoring shard ID topics.
*
* @param context context of the actor under which publisher and subscriber actors are created.
* @param system the actor system.
* @param distributedAcks the distributed acks interface.
* @return the thing event pub-sub factory.
*/
public static ThingEventPubSubFactory readSubjectsOnly(final ActorContext context,
public static ThingEventPubSubFactory readSubjectsOnly(final ActorSystem system,
final DistributedAcks distributedAcks) {
return new ThingEventPubSubFactory(context, readSubjectOnlyExtractor(), distributedAcks);
return new ThingEventPubSubFactory(system, system, readSubjectOnlyExtractor(), distributedAcks);
}

/**
Expand All @@ -85,7 +89,7 @@ public static ThingEventPubSubFactory shardIdOnly(final ActorContext context, fi

final PubSubTopicExtractor<ThingEvent<?>> topicExtractor =
shardIdOnlyExtractor(ShardRegionExtractor.of(numberOfShards, context.system()));
return new ThingEventPubSubFactory(context, topicExtractor, distributedAcks);
return new ThingEventPubSubFactory(context, context.system(), topicExtractor, distributedAcks);
}

private static PubSubTopicExtractor<ThingEvent<?>> readSubjectOnlyExtractor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,29 @@ public abstract class AbstractPubSubFactory<T extends Signal<?>> implements PubS

/**
* Create a pub-sub factory.
* @param context context of the actor under which publisher and subscriber actors are created.
* @param actorRefFactory context of the actor under which publisher and subscriber actors are created.
* @param actorSystem the actor system.
* @param messageClass the class of messages to publish and subscribe for.
* @param topicExtractor a function extracting from each message the topics it was published at.
* @param provider provider of the underlying ddata extension.
* @param ackExtractor extractor of acknowledgement-related information from a message.
* @param distributedAcks a second ddata for declared acknowledgement labels.
*/
protected AbstractPubSubFactory(final ActorContext context,
protected AbstractPubSubFactory(final ActorRefFactory actorRefFactory,
final ActorSystem actorSystem,
final Class<T> messageClass,
final PubSubTopicExtractor<T> topicExtractor,
final DDataProvider provider,
final AckExtractor<T> ackExtractor,
final DistributedAcks distributedAcks) {

this.actorRefFactory = context;
this.actorRefFactory = actorRefFactory;
this.messageClass = messageClass;
factoryId = provider.clusterRole;
this.topicExtractor = topicExtractor;
this.ackExtractor = ackExtractor;
ddataConfig = provider.getConfig(context.system());
ddata = CompressedDData.of(context.system(), provider);
ddataConfig = provider.getConfig(actorSystem);
ddata = CompressedDData.of(actorSystem, provider);
this.distributedAcks = distributedAcks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
* Interface to access the local and distributed data of declared acknowledgement labels.
Expand Down Expand Up @@ -99,6 +100,17 @@ static DistributedAcks create(final ActorContext context) {
return DistributedAcksImpl.create(context);
}

/**
* Start AcksSupervisor under the user guardian and expose a DistributedAcks interface.
* Precondition: the cluster member has the role {@code "acks-aware"}.
*
* @param system the actor system.
* @return the DistributedAcks interface.
*/
static DistributedAcks create(final ActorSystem system) {
return DistributedAcksImpl.create(system, system);
}

/**
* Create a dummy {@code DistributedAcks} interface not backed by a distributed data.
* Useful for cluster members not participating in signal publication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.Patterns;

Expand All @@ -53,17 +55,22 @@ private DistributedAcksImpl(final DistributedDataConfig config, final ActorRef a
}

static DistributedAcks create(final ActorContext actorContext) {
return create(actorContext, actorContext.system());
}

static DistributedAcks create(final ActorRefFactory actorRefFactory, final ActorSystem actorSystem) {
final LiteralDDataProvider provider = LiteralDDataProvider.of(CLUSTER_ROLE, "acks");
return create(actorContext, CLUSTER_ROLE, provider);
return create(actorRefFactory, actorSystem, CLUSTER_ROLE, provider);
}

static DistributedAcks create(final ActorContext actorContext,
static DistributedAcks create(final ActorRefFactory actorRefFactory,
final ActorSystem system,
final String clusterRole,
final LiteralDDataProvider provider) {
final String supervisorName = clusterRole + "-ack-supervisor";
final Props props = AckSupervisor.props(LiteralDData.of(actorContext.system(), provider));
final ActorRef supervisor = actorContext.actorOf(props, supervisorName);
final DistributedDataConfig config = provider.getConfig(actorContext.system());
final Props props = AckSupervisor.props(LiteralDData.of(system, provider));
final ActorRef supervisor = actorRefFactory.actorOf(props, supervisorName);
final DistributedDataConfig config = provider.getConfig(system);
return new DistributedAcksImpl(config, supervisor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ private TestPubSubFactory(final ActorContext context,
final PubSubTopicExtractor<Acknowledgement> topicExtractor,
final AckExtractor<Acknowledgement> ackExtractor,
final DistributedAcks distributedAcks) {
super(context, Acknowledgement.class, topicExtractor, PROVIDER, ackExtractor, distributedAcks);
super(context, context.system(), Acknowledgement.class, topicExtractor, PROVIDER, ackExtractor,
distributedAcks);
final PubSubConfig config = PubSubConfig.of(context.system().settings().config().getConfig("ditto.pubsub"));
seeds = Hashes.digestStringsToIntegers(config.getSeed(), Hashes.HASH_FAMILY_SIZE);
}

static DistributedAcks startDistributedAcks(final ActorContext context) {
return DistributedAcksImpl.create(context, "dc-default", ACKS_PROVIDER);
return DistributedAcksImpl.create(context, context.system(), "dc-default", ACKS_PROVIDER);
}

static TestPubSubFactory of(final ActorContext context, final AckExtractor<Acknowledgement> ackExtractor,
Expand Down

0 comments on commit 6bf7e96

Please sign in to comment.