Skip to content

Commit

Permalink
[eclipse-ditto#964] subscribe for policy notifications in gateway and…
Browse files Browse the repository at this point in the history
… connectivity.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Feb 15, 2021
1 parent ae1702d commit 60e710d
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package org.eclipse.ditto.model.policies;

import java.util.Objects;
import java.util.UUID;
import java.util.function.Supplier;

Expand All @@ -21,19 +20,19 @@
import org.eclipse.ditto.model.base.entity.id.DefaultNamespacedEntityId;
import org.eclipse.ditto.model.base.entity.id.NamespacedEntityId;
import org.eclipse.ditto.model.base.entity.id.NamespacedEntityIdInvalidException;
import org.eclipse.ditto.model.base.entity.id.NamespacedEntityIdWithType;
import org.eclipse.ditto.model.base.entity.type.EntityType;

/**
* Java representation of a policy ID.
*/
@Immutable
public final class PolicyId implements NamespacedEntityId {
public final class PolicyId extends NamespacedEntityIdWithType {

private static final PolicyId DUMMY_ID = PolicyId.of(DefaultNamespacedEntityId.dummy());

private final NamespacedEntityId entityId;

private PolicyId(final NamespacedEntityId entityId) {
this.entityId = entityId;
super(entityId);
}

/**
Expand Down Expand Up @@ -97,40 +96,7 @@ public static PolicyId dummy() {
}

@Override
public String getName() {
return entityId.getName();
}

@Override
public String getNamespace() {
return entityId.getNamespace();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final PolicyId that = (PolicyId) o;
return Objects.equals(entityId, that.entityId);
public EntityType getEntityType() {
return PolicyConstants.ENTITY_TYPE;
}

@Override
public int hashCode() {
return Objects.hash(entityId);
}

@Override
public String toString() {
return entityId.toString();
}

@Override
public boolean isDummy() {
return entityId.isDummy();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void testImmutability() {

@Test
public void testEqualsAndHashcode() {
EqualsVerifier.forClass(PolicyId.class).verify();
EqualsVerifier.forClass(PolicyId.class).withRedefinedSuperclass().verify();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,20 @@ public void removeSubscriber(final ActorRef subscriber) {
public CompletionStage<Void> updateLiveSubscriptions(final Collection<StreamingType> types,
final Collection<String> topics, final ActorRef subscriber) {
doDelegate(d -> d.updateLiveSubscriptions(types, topics, subscriber));
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedStage(null);
}

@Override
public CompletionStage<Void> removeTwinSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
doDelegate(d -> d.removeTwinSubscriber(subscriber, topics));
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedStage(null);
}

@Override
public CompletionStage<Void> removePolicyNotificationSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
return CompletableFuture.completedStage(null);
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions services/connectivity/messaging/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ akka {
// dead lettters log disabled because actors are being killed all the time
akka.log-dead-letters = 0
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.cluster.roles = ["thing-event-aware", "live-signal-aware", "acks-aware"]
akka.cluster.roles = ["thing-event-aware", "live-signal-aware", "acks-aware", "policy-notification-aware"]

ditto {
mapping-strategy.implementation = "org.eclipse.ditto.services.models.connectivity.ConnectivityMappingStrategies"
Expand Down Expand Up @@ -287,7 +287,8 @@ akka {
"connectivity",
"thing-event-aware",
"live-signal-aware",
"acks-aware"
"acks-aware",
"policy-notification-aware"
]

distributed-data {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,19 @@ public void removeSubscriber(final ActorRef subscriber) {
public CompletionStage<Void> updateLiveSubscriptions(
final Collection<StreamingType> types,
final Collection<String> topics, final ActorRef subscriber) {
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedStage(null);
}

@Override
public CompletionStage<Void> removeTwinSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedStage(null);
}

@Override
public CompletionStage<Void> removePolicyNotificationSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
return CompletableFuture.completedStage(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.START_SEND_LIVE_COMMANDS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.START_SEND_LIVE_EVENTS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.START_SEND_MESSAGES;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.START_SEND_POLICY_NOTIFICATIONS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_EVENTS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_LIVE_COMMANDS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_LIVE_EVENTS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_MESSAGES;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_POLICY_NOTIFICATIONS;

import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -765,6 +767,11 @@ private static String streamingAckToString(final StreamingAck streamingAck) {
case LIVE_EVENTS:
protocolMessage = subscribed ? START_SEND_LIVE_EVENTS.toString() : STOP_SEND_LIVE_EVENTS.toString();
break;
case POLICY_NOTIFICATIONS:
protocolMessage = subscribed
? START_SEND_POLICY_NOTIFICATIONS.toString()
: STOP_SEND_POLICY_NOTIFICATIONS.toString();
break;
default:
throw new IllegalArgumentException("Unknown streamingType: " + streamingType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,24 @@ private Receive createPubSubBehavior() {
final ConfirmUnsubscription unsubscribeConfirmation =
new ConfirmUnsubscription(stopStreaming.getStreamingType());
final Collection<StreamingType> currentStreamingTypes = streamingSessions.keySet();
if (stopStreaming.getStreamingType() != StreamingType.EVENTS) {
dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(), getSelf())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
} else {
dittoProtocolSub.removeTwinSubscriber(getSelf(),
authorizationContext.getAuthorizationSubjectIds())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
switch (stopStreaming.getStreamingType()) {
case EVENTS:
dittoProtocolSub.removeTwinSubscriber(getSelf(),
authorizationContext.getAuthorizationSubjectIds())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
break;
case POLICY_NOTIFICATIONS:
dittoProtocolSub.removePolicyNotificationSubscriber(getSelf(),
authorizationContext.getAuthorizationSubjectIds())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
break;
case LIVE_COMMANDS:
case LIVE_EVENTS:
case MESSAGES:
default:
dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(), getSelf())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
}
})
.match(ConfirmSubscription.class, msg -> confirmSubscription(msg.getStreamingType()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
akka.cluster.roles += "live-signal-aware"
akka.cluster.roles += "thing-event-aware"
akka.cluster.roles += "acks-aware"
akka.cluster.roles += "policy-notification-aware"
4 changes: 4 additions & 0 deletions services/policies/persistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-utils-persistent-actors</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-utils-pubsub</artifactId>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.ditto.services.utils.persistence.mongo.MongoHealthChecker;
import org.eclipse.ditto.services.utils.persistence.mongo.MongoMetricsReporter;
import org.eclipse.ditto.services.utils.persistence.mongo.config.TagsConfig;
import org.eclipse.ditto.services.utils.pubsub.PolicyNotificationPubSubFactory;
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;

import akka.actor.ActorRef;
Expand Down Expand Up @@ -68,6 +69,10 @@ private PoliciesRootActor(final PoliciesConfig policiesConfig,
final ClusterShardingSettings shardingSettings =
ClusterShardingSettings.create(actorSystem).withRole(CLUSTER_ROLE);

// Start distributed data replicator even though it is not used for now.
// TODO: use the DistributedSub for notification publication
PolicyNotificationPubSubFactory.of(actorSystem).startDistributedPub();

final Props policySupervisorProps = PolicySupervisorActor.props(pubSubMediator, snapshotAdapter);

final TagsConfig tagsConfig = policiesConfig.getTagsConfig();
Expand Down
3 changes: 2 additions & 1 deletion services/policies/starter/src/main/resources/policies.conf
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ akka {
}

roles = [
"policies"
"policies",
"policy-notification-aware"
]
}

Expand Down
3 changes: 2 additions & 1 deletion services/utils/config/src/main/resources/ditto-cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ ditto.cluster {
"blocked-namespaces-aware",
"thing-event-aware",
"live-signal-aware",
"acks-aware"
"acks-aware",
"policy-notification-aware"
]
}
4 changes: 4 additions & 0 deletions services/utils/pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>ditto-signals-events-things</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-signals-notifications-policies</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>ditto-services-utils-akka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ CompletionStage<Void> updateLiveSubscriptions(Collection<StreamingType> types, C
*/
CompletionStage<Void> removeTwinSubscriber(ActorRef subscriber, Collection<String> topics);

/**
* Remove a subscriber from the policy notifications only.
*
* @param subscriber whom to remove.
* @param topics what were the subscribed topics.
* @return future that completes or fails according to the acknowledgement.
*/
CompletionStage<Void> removePolicyNotificationSubscriber(ActorRef subscriber, Collection<String> topics);

/**
* Declare acknowledgement labels for a subscriber.
* Declared acknowledgement labels are globally unique for each subscriber.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ final class DittoProtocolSubImpl implements DittoProtocolSub {

private final DistributedSub liveSignalSub;
private final DistributedSub twinEventSub;
private final DistributedSub policyNotificationSub;
private final DistributedAcks distributedAcks;

private DittoProtocolSubImpl(final DistributedSub liveSignalSub,
final DistributedSub twinEventSub,
final DistributedSub policyNotificationSub,
final DistributedAcks distributedAcks) {
this.liveSignalSub = liveSignalSub;
this.twinEventSub = twinEventSub;
this.policyNotificationSub = policyNotificationSub;
this.distributedAcks = distributedAcks;
}

Expand All @@ -53,7 +56,8 @@ static DittoProtocolSubImpl of(final ActorSystem system, final DistributedAcks d
LiveSignalPubSubFactory.of(system, distributedAcks).startDistributedSub();
final DistributedSub twinEventSub =
ThingEventPubSubFactory.readSubjectsOnly(system, distributedAcks).startDistributedSub();
return new DittoProtocolSubImpl(liveSignalSub, twinEventSub, distributedAcks);
final DistributedSub policyNotificationSub = PolicyNotificationPubSubFactory.of(system).startDistributedSub();
return new DittoProtocolSubImpl(liveSignalSub, twinEventSub, policyNotificationSub, distributedAcks);
}

@Override
Expand All @@ -68,6 +72,9 @@ public CompletionStage<Void> subscribe(final Collection<StreamingType> types,
: nop,
hasTwinEvents -> hasTwinEvents
? twinEventSub.subscribeWithFilterAndGroup(topics, subscriber, null, group)
: nop,
hasPolicyNotification -> hasPolicyNotification
? policyNotificationSub.subscribeWithFilterAndGroup(topics, subscriber, null, group)
: nop
);
}
Expand All @@ -76,6 +83,7 @@ public CompletionStage<Void> subscribe(final Collection<StreamingType> types,
public void removeSubscriber(final ActorRef subscriber) {
liveSignalSub.removeSubscriber(subscriber);
twinEventSub.removeSubscriber(subscriber);
policyNotificationSub.removeSubscriber(subscriber);
distributedAcks.removeSubscriber(subscriber);
}

Expand All @@ -88,7 +96,8 @@ public CompletionStage<Void> updateLiveSubscriptions(final Collection<StreamingT
liveTypes -> !liveTypes.isEmpty()
? liveSignalSub.subscribeWithFilterAndGroup(topics, subscriber, toFilter(liveTypes), null)
: liveSignalSub.unsubscribeWithAck(topics, subscriber),
hasTwinEvents -> CompletableFuture.completedFuture(null)
hasTwinEvents -> CompletableFuture.completedStage(null),
hasPolicyNotification -> CompletableFuture.completedStage(null)
);
}

Expand All @@ -97,6 +106,12 @@ public CompletionStage<Void> removeTwinSubscriber(final ActorRef subscriber, fin
return twinEventSub.unsubscribeWithAck(topics, subscriber).thenApply(ack -> null);
}

@Override
public CompletionStage<Void> removePolicyNotificationSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
return policyNotificationSub.unsubscribeWithAck(topics, subscriber).thenApply(ack -> null);
}

@Override
public CompletionStage<Void> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels,
Expand Down Expand Up @@ -132,19 +147,25 @@ public void removeAcknowledgementLabelDeclaration(final ActorRef subscriber) {

private CompletionStage<Void> partitionByStreamingTypes(final Collection<StreamingType> types,
final Function<Set<StreamingType>, CompletionStage<?>> onLiveSignals,
final Function<Boolean, CompletionStage<?>> onTwinEvents) {
final Function<Boolean, CompletionStage<?>> onTwinEvents,
final Function<Boolean, CompletionStage<?>> onPolicyNotifications) {
final Set<StreamingType> liveTypes;
final boolean hasTwinEvents;
final boolean hasPolicyNotifications;
if (types.isEmpty()) {
liveTypes = Collections.emptySet();
hasTwinEvents = false;
hasPolicyNotifications = false;
} else {
liveTypes = EnumSet.copyOf(types);
hasTwinEvents = liveTypes.remove(StreamingType.EVENTS);
hasPolicyNotifications = liveTypes.remove(StreamingType.POLICY_NOTIFICATIONS);
}
final CompletableFuture<?> liveStage = onLiveSignals.apply(liveTypes).toCompletableFuture();
final CompletableFuture<?> twinStage = onTwinEvents.apply(hasTwinEvents).toCompletableFuture();
return CompletableFuture.allOf(liveStage, twinStage);
final CompletableFuture<?> policyNotificationStage =
onPolicyNotifications.apply(hasPolicyNotifications).toCompletableFuture();
return CompletableFuture.allOf(liveStage, twinStage, policyNotificationStage);
}

private static Predicate<Collection<String>> toFilter(final Collection<StreamingType> streamingTypes) {
Expand Down

0 comments on commit 60e710d

Please sign in to comment.