Skip to content

Commit

Permalink
[#964] subscribe for policy notifications in gateway and connectivity.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Feb 15, 2021
1 parent ae1702d commit 60e710d
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 61 deletions.
Expand Up @@ -12,7 +12,6 @@
*/
package org.eclipse.ditto.model.policies;

import java.util.Objects;
import java.util.UUID;
import java.util.function.Supplier;

Expand All @@ -21,19 +20,19 @@
import org.eclipse.ditto.model.base.entity.id.DefaultNamespacedEntityId;
import org.eclipse.ditto.model.base.entity.id.NamespacedEntityId;
import org.eclipse.ditto.model.base.entity.id.NamespacedEntityIdInvalidException;
import org.eclipse.ditto.model.base.entity.id.NamespacedEntityIdWithType;
import org.eclipse.ditto.model.base.entity.type.EntityType;

/**
* Java representation of a policy ID.
*/
@Immutable
public final class PolicyId implements NamespacedEntityId {
public final class PolicyId extends NamespacedEntityIdWithType {

private static final PolicyId DUMMY_ID = PolicyId.of(DefaultNamespacedEntityId.dummy());

private final NamespacedEntityId entityId;

private PolicyId(final NamespacedEntityId entityId) {
this.entityId = entityId;
super(entityId);
}

/**
Expand Down Expand Up @@ -97,40 +96,7 @@ public static PolicyId dummy() {
}

@Override
public String getName() {
return entityId.getName();
}

@Override
public String getNamespace() {
return entityId.getNamespace();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final PolicyId that = (PolicyId) o;
return Objects.equals(entityId, that.entityId);
public EntityType getEntityType() {
return PolicyConstants.ENTITY_TYPE;
}

@Override
public int hashCode() {
return Objects.hash(entityId);
}

@Override
public String toString() {
return entityId.toString();
}

@Override
public boolean isDummy() {
return entityId.isDummy();
}

}
Expand Up @@ -32,7 +32,7 @@ public void testImmutability() {

@Test
public void testEqualsAndHashcode() {
EqualsVerifier.forClass(PolicyId.class).verify();
EqualsVerifier.forClass(PolicyId.class).withRedefinedSuperclass().verify();
}

@Test
Expand Down
Expand Up @@ -250,14 +250,20 @@ public void removeSubscriber(final ActorRef subscriber) {
public CompletionStage<Void> updateLiveSubscriptions(final Collection<StreamingType> types,
final Collection<String> topics, final ActorRef subscriber) {
doDelegate(d -> d.updateLiveSubscriptions(types, topics, subscriber));
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedStage(null);
}

@Override
public CompletionStage<Void> removeTwinSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
doDelegate(d -> d.removeTwinSubscriber(subscriber, topics));
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedStage(null);
}

@Override
public CompletionStage<Void> removePolicyNotificationSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
return CompletableFuture.completedStage(null);
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions services/connectivity/messaging/src/test/resources/test.conf
Expand Up @@ -16,7 +16,7 @@ akka {
// dead lettters log disabled because actors are being killed all the time
akka.log-dead-letters = 0
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.cluster.roles = ["thing-event-aware", "live-signal-aware", "acks-aware"]
akka.cluster.roles = ["thing-event-aware", "live-signal-aware", "acks-aware", "policy-notification-aware"]

ditto {
mapping-strategy.implementation = "org.eclipse.ditto.services.models.connectivity.ConnectivityMappingStrategies"
Expand Down Expand Up @@ -287,7 +287,8 @@ akka {
"connectivity",
"thing-event-aware",
"live-signal-aware",
"acks-aware"
"acks-aware",
"policy-notification-aware"
]

distributed-data {
Expand Down
Expand Up @@ -154,13 +154,19 @@ public void removeSubscriber(final ActorRef subscriber) {
public CompletionStage<Void> updateLiveSubscriptions(
final Collection<StreamingType> types,
final Collection<String> topics, final ActorRef subscriber) {
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedStage(null);
}

@Override
public CompletionStage<Void> removeTwinSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedStage(null);
}

@Override
public CompletionStage<Void> removePolicyNotificationSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
return CompletableFuture.completedStage(null);
}

@Override
Expand Down
Expand Up @@ -18,10 +18,12 @@
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.START_SEND_LIVE_COMMANDS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.START_SEND_LIVE_EVENTS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.START_SEND_MESSAGES;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.START_SEND_POLICY_NOTIFICATIONS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_EVENTS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_LIVE_COMMANDS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_LIVE_EVENTS;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_MESSAGES;
import static org.eclipse.ditto.services.gateway.endpoints.routes.websocket.ProtocolMessageType.STOP_SEND_POLICY_NOTIFICATIONS;

import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -765,6 +767,11 @@ private static String streamingAckToString(final StreamingAck streamingAck) {
case LIVE_EVENTS:
protocolMessage = subscribed ? START_SEND_LIVE_EVENTS.toString() : STOP_SEND_LIVE_EVENTS.toString();
break;
case POLICY_NOTIFICATIONS:
protocolMessage = subscribed
? START_SEND_POLICY_NOTIFICATIONS.toString()
: STOP_SEND_POLICY_NOTIFICATIONS.toString();
break;
default:
throw new IllegalArgumentException("Unknown streamingType: " + streamingType);
}
Expand Down
Expand Up @@ -331,14 +331,24 @@ private Receive createPubSubBehavior() {
final ConfirmUnsubscription unsubscribeConfirmation =
new ConfirmUnsubscription(stopStreaming.getStreamingType());
final Collection<StreamingType> currentStreamingTypes = streamingSessions.keySet();
if (stopStreaming.getStreamingType() != StreamingType.EVENTS) {
dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(), getSelf())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
} else {
dittoProtocolSub.removeTwinSubscriber(getSelf(),
authorizationContext.getAuthorizationSubjectIds())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
switch (stopStreaming.getStreamingType()) {
case EVENTS:
dittoProtocolSub.removeTwinSubscriber(getSelf(),
authorizationContext.getAuthorizationSubjectIds())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
break;
case POLICY_NOTIFICATIONS:
dittoProtocolSub.removePolicyNotificationSubscriber(getSelf(),
authorizationContext.getAuthorizationSubjectIds())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
break;
case LIVE_COMMANDS:
case LIVE_EVENTS:
case MESSAGES:
default:
dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(), getSelf())
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
}
})
.match(ConfirmSubscription.class, msg -> confirmSubscription(msg.getStreamingType()))
Expand Down
Expand Up @@ -5,3 +5,4 @@
akka.cluster.roles += "live-signal-aware"
akka.cluster.roles += "thing-event-aware"
akka.cluster.roles += "acks-aware"
akka.cluster.roles += "policy-notification-aware"
4 changes: 4 additions & 0 deletions services/policies/persistence/pom.xml
Expand Up @@ -85,6 +85,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-utils-persistent-actors</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-utils-pubsub</artifactId>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.ditto.services.utils.persistence.mongo.MongoHealthChecker;
import org.eclipse.ditto.services.utils.persistence.mongo.MongoMetricsReporter;
import org.eclipse.ditto.services.utils.persistence.mongo.config.TagsConfig;
import org.eclipse.ditto.services.utils.pubsub.PolicyNotificationPubSubFactory;
import org.eclipse.ditto.signals.commands.devops.RetrieveStatisticsDetails;

import akka.actor.ActorRef;
Expand Down Expand Up @@ -68,6 +69,10 @@ private PoliciesRootActor(final PoliciesConfig policiesConfig,
final ClusterShardingSettings shardingSettings =
ClusterShardingSettings.create(actorSystem).withRole(CLUSTER_ROLE);

// Start distributed data replicator even though it is not used for now.
// TODO: use the DistributedSub for notification publication
PolicyNotificationPubSubFactory.of(actorSystem).startDistributedPub();

final Props policySupervisorProps = PolicySupervisorActor.props(pubSubMediator, snapshotAdapter);

final TagsConfig tagsConfig = policiesConfig.getTagsConfig();
Expand Down
3 changes: 2 additions & 1 deletion services/policies/starter/src/main/resources/policies.conf
Expand Up @@ -89,7 +89,8 @@ akka {
}

roles = [
"policies"
"policies",
"policy-notification-aware"
]
}

Expand Down
3 changes: 2 additions & 1 deletion services/utils/config/src/main/resources/ditto-cluster.conf
Expand Up @@ -8,6 +8,7 @@ ditto.cluster {
"blocked-namespaces-aware",
"thing-event-aware",
"live-signal-aware",
"acks-aware"
"acks-aware",
"policy-notification-aware"
]
}
4 changes: 4 additions & 0 deletions services/utils/pubsub/pom.xml
Expand Up @@ -33,6 +33,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>ditto-signals-events-things</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-signals-notifications-policies</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>ditto-services-utils-akka</artifactId>
Expand Down
Expand Up @@ -80,6 +80,15 @@ CompletionStage<Void> updateLiveSubscriptions(Collection<StreamingType> types, C
*/
CompletionStage<Void> removeTwinSubscriber(ActorRef subscriber, Collection<String> topics);

/**
* Remove a subscriber from the policy notifications only.
*
* @param subscriber whom to remove.
* @param topics what were the subscribed topics.
* @return future that completes or fails according to the acknowledgement.
*/
CompletionStage<Void> removePolicyNotificationSubscriber(ActorRef subscriber, Collection<String> topics);

/**
* Declare acknowledgement labels for a subscriber.
* Declared acknowledgement labels are globally unique for each subscriber.
Expand Down
Expand Up @@ -38,13 +38,16 @@ final class DittoProtocolSubImpl implements DittoProtocolSub {

private final DistributedSub liveSignalSub;
private final DistributedSub twinEventSub;
private final DistributedSub policyNotificationSub;
private final DistributedAcks distributedAcks;

private DittoProtocolSubImpl(final DistributedSub liveSignalSub,
final DistributedSub twinEventSub,
final DistributedSub policyNotificationSub,
final DistributedAcks distributedAcks) {
this.liveSignalSub = liveSignalSub;
this.twinEventSub = twinEventSub;
this.policyNotificationSub = policyNotificationSub;
this.distributedAcks = distributedAcks;
}

Expand All @@ -53,7 +56,8 @@ static DittoProtocolSubImpl of(final ActorSystem system, final DistributedAcks d
LiveSignalPubSubFactory.of(system, distributedAcks).startDistributedSub();
final DistributedSub twinEventSub =
ThingEventPubSubFactory.readSubjectsOnly(system, distributedAcks).startDistributedSub();
return new DittoProtocolSubImpl(liveSignalSub, twinEventSub, distributedAcks);
final DistributedSub policyNotificationSub = PolicyNotificationPubSubFactory.of(system).startDistributedSub();
return new DittoProtocolSubImpl(liveSignalSub, twinEventSub, policyNotificationSub, distributedAcks);
}

@Override
Expand All @@ -68,6 +72,9 @@ public CompletionStage<Void> subscribe(final Collection<StreamingType> types,
: nop,
hasTwinEvents -> hasTwinEvents
? twinEventSub.subscribeWithFilterAndGroup(topics, subscriber, null, group)
: nop,
hasPolicyNotification -> hasPolicyNotification
? policyNotificationSub.subscribeWithFilterAndGroup(topics, subscriber, null, group)
: nop
);
}
Expand All @@ -76,6 +83,7 @@ public CompletionStage<Void> subscribe(final Collection<StreamingType> types,
public void removeSubscriber(final ActorRef subscriber) {
liveSignalSub.removeSubscriber(subscriber);
twinEventSub.removeSubscriber(subscriber);
policyNotificationSub.removeSubscriber(subscriber);
distributedAcks.removeSubscriber(subscriber);
}

Expand All @@ -88,7 +96,8 @@ public CompletionStage<Void> updateLiveSubscriptions(final Collection<StreamingT
liveTypes -> !liveTypes.isEmpty()
? liveSignalSub.subscribeWithFilterAndGroup(topics, subscriber, toFilter(liveTypes), null)
: liveSignalSub.unsubscribeWithAck(topics, subscriber),
hasTwinEvents -> CompletableFuture.completedFuture(null)
hasTwinEvents -> CompletableFuture.completedStage(null),
hasPolicyNotification -> CompletableFuture.completedStage(null)
);
}

Expand All @@ -97,6 +106,12 @@ public CompletionStage<Void> removeTwinSubscriber(final ActorRef subscriber, fin
return twinEventSub.unsubscribeWithAck(topics, subscriber).thenApply(ack -> null);
}

@Override
public CompletionStage<Void> removePolicyNotificationSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
return policyNotificationSub.unsubscribeWithAck(topics, subscriber).thenApply(ack -> null);
}

@Override
public CompletionStage<Void> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels,
Expand Down Expand Up @@ -132,19 +147,25 @@ public void removeAcknowledgementLabelDeclaration(final ActorRef subscriber) {

private CompletionStage<Void> partitionByStreamingTypes(final Collection<StreamingType> types,
final Function<Set<StreamingType>, CompletionStage<?>> onLiveSignals,
final Function<Boolean, CompletionStage<?>> onTwinEvents) {
final Function<Boolean, CompletionStage<?>> onTwinEvents,
final Function<Boolean, CompletionStage<?>> onPolicyNotifications) {
final Set<StreamingType> liveTypes;
final boolean hasTwinEvents;
final boolean hasPolicyNotifications;
if (types.isEmpty()) {
liveTypes = Collections.emptySet();
hasTwinEvents = false;
hasPolicyNotifications = false;
} else {
liveTypes = EnumSet.copyOf(types);
hasTwinEvents = liveTypes.remove(StreamingType.EVENTS);
hasPolicyNotifications = liveTypes.remove(StreamingType.POLICY_NOTIFICATIONS);
}
final CompletableFuture<?> liveStage = onLiveSignals.apply(liveTypes).toCompletableFuture();
final CompletableFuture<?> twinStage = onTwinEvents.apply(hasTwinEvents).toCompletableFuture();
return CompletableFuture.allOf(liveStage, twinStage);
final CompletableFuture<?> policyNotificationStage =
onPolicyNotifications.apply(hasPolicyNotifications).toCompletableFuture();
return CompletableFuture.allOf(liveStage, twinStage, policyNotificationStage);
}

private static Predicate<Collection<String>> toFilter(final Collection<StreamingType> streamingTypes) {
Expand Down

0 comments on commit 60e710d

Please sign in to comment.