From 3b6362b3a35cdac8fd3e03b66cdb84fcdd94a86b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20J=C3=A4ckle?= Date: Fri, 9 Feb 2024 16:43:06 +0100 Subject: [PATCH] #1894 optimize Ditto internal pub/sub by adding subscribed for namespaces to topic * this can greatly reduce the amount of message dispatched internally when subscribing only for certain namespaces --- .../service/messaging/BaseClientActor.java | 21 ++++++--- .../routes/sse/ThingsSseRouteBuilder.java | 11 +++-- .../routes/websocket/WebSocketRoute.java | 3 +- .../actors/StreamingSessionActor.java | 22 +++++++-- .../service/streaming/signals/Connect.java | 18 ++++++-- ...mingSessionActorHeaderInteractionTest.java | 27 +++++------ .../actors/StreamingSessionActorTest.java | 1 + .../utils/pubsubthings/DittoProtocolSub.java | 7 ++- .../internal/utils/pubsub/DistributedPub.java | 7 ++- .../internal/utils/pubsub/DistributedSub.java | 5 +- .../extractors/ReadSubjectExtractor.java | 46 ++++++++++++++++++- 11 files changed, 124 insertions(+), 44 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java index 62c2ef7a58..e72254d356 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java @@ -74,7 +74,6 @@ import org.eclipse.ditto.base.model.acks.AcknowledgementLabel; import org.eclipse.ditto.base.model.acks.FatalPubSubException; import org.eclipse.ditto.base.model.acks.PubSubTerminatedException; -import org.eclipse.ditto.base.model.auth.AuthorizationContext; import org.eclipse.ditto.base.model.entity.id.EntityId; import org.eclipse.ditto.base.model.entity.id.WithEntityId; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; @@ -146,6 +145,7 @@ import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider; import org.eclipse.ditto.internal.utils.pubsub.StreamingType; +import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor; import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub; import org.eclipse.ditto.internal.utils.search.SubscriptionManager; import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; @@ -181,7 +181,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash getTargetAuthSubjects() { return connection.getTargets() .stream() - .map(Target::getAuthorizationContext) - .map(AuthorizationContext::getAuthorizationSubjectIds) - .flatMap(List::stream) + .map(target -> { + final Set namespaces = target.getTopics().stream() + .flatMap(ft -> ft.getNamespaces().stream()) + .collect(Collectors.toSet()); + return ReadSubjectExtractor + .determineTopicsFor(namespaces, target.getAuthorizationContext().getAuthorizationSubjects()); + }) + .flatMap(Collection::stream) .collect(Collectors.toSet()); } diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java index f2efc90cca..5465cffc37 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java @@ -297,7 +297,9 @@ private Route buildThingsSseRoute(final RequestContext ctx, jsonPointerString -> { if (INBOX_OUTBOX_PATTERN.matcher(jsonPointerString).matches()) { return createMessagesSseRoute(ctx, dhcs, thingId, - jsonPointerString); + jsonPointerString, + getNamespaces(parameters.get(PARAM_NAMESPACES)) + ); } else { params.put(PARAM_FIELDS, jsonPointerString); return createSseRoute(ctx, dhcs, @@ -428,7 +430,7 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage @@ -459,7 +461,8 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage dittoHeadersStage, final String thingId, - final String messagePath) { + final String messagePath, + final List namespaces) { final List targetThingIds = List.of(ThingId.of(thingId)); final CompletionStage facadeStage = signalEnrichmentProvider == null @@ -490,7 +493,7 @@ private Route createMessagesSseRoute(final RequestContext ctx, final var connect = new Connect(withQueue.getSourceQueue(), connectionCorrelationId, STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(), - authorizationContext, null); + authorizationContext, namespaces, null); final String resourcePathRqlStatement; if (INBOX_OUTBOX_WITH_SUBJECT_PATTERN.matcher(messagePath).matches()) { resourcePathRqlStatement = String.format( diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.java index ef630b0453..0a7f969aa6 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -575,7 +576,7 @@ private Pair> createOutgo return new Connect(withQueue.getSourceQueue(), connectionCorrelationId, STREAMING_TYPE_WS, version, optJsonWebToken.map(JsonWebToken::getExpirationTime).orElse(null), readDeclaredAcknowledgementLabels(additionalHeaders), connectionAuthContext, - wsKillSwitch); + List.of(), wsKillSwitch); }) .recoverWithRetries(1, new PFBuilder>() .match(GatewayWebsocketSessionAbortedException.class, diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java index 7e9c307366..adfe698e29 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java @@ -88,6 +88,7 @@ import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.pubsub.StreamingType; +import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor; import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub; import org.eclipse.ditto.internal.utils.search.SubscriptionManager; import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken; @@ -138,6 +139,7 @@ final class StreamingSessionActor extends AbstractActorWithTimers { private final Set declaredAcks; private final ThreadSafeDittoLoggingAdapter logger; private AuthorizationContext authorizationContext; + private List namespaces; private Cancellable cancellableShutdownTask; @Nullable private final KillSwitch killSwitch; @@ -164,6 +166,7 @@ private StreamingSessionActor(final Connect connect, this.jwtAuthenticationResultProvider = jwtAuthenticationResultProvider; outstandingSubscriptionAcks = EnumSet.noneOf(StreamingType.class); authorizationContext = connect.getConnectionAuthContext(); + namespaces = connect.getNamespaces(); killSwitch = connect.getKillSwitch().orElse(null); streamingSessions = new EnumMap<>(StreamingType.class); ackregatorStarter = AcknowledgementAggregatorActorStarter.of(getContext(), @@ -373,6 +376,7 @@ private Receive createPubSubBehavior() { }) .match(StartStreaming.class, startStreaming -> { authorizationContext = startStreaming.getAuthorizationContext(); + namespaces = startStreaming.getNamespaces(); Criteria criteria; try { criteria = startStreaming.getFilter() @@ -399,7 +403,10 @@ private Receive createPubSubBehavior() { final var subscribeConfirmation = new ConfirmSubscription(startStreaming.getStreamingType()); final Collection currentStreamingTypes = streamingSessions.keySet(); dittoProtocolSub.subscribe(currentStreamingTypes, - authorizationContext.getAuthorizationSubjectIds(), + ReadSubjectExtractor.determineTopicsFor( + startStreaming.getNamespaces(), + authorizationContext.getAuthorizationSubjects() + ), getSelf() ).whenComplete((ack, throwable) -> { if (null == throwable) { @@ -444,7 +451,12 @@ private Receive createPubSubBehavior() { case LIVE_COMMANDS, LIVE_EVENTS, MESSAGES: default: dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes, - authorizationContext.getAuthorizationSubjectIds(), getSelf()) + ReadSubjectExtractor.determineTopicsFor( + namespaces, + authorizationContext.getAuthorizationSubjects() + ), + getSelf() + ) .thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf())); } }) @@ -817,7 +829,11 @@ private void startSubscriptionRefreshTimer() { private void resubscribe(final Control trigger) { if (!streamingSessions.isEmpty() && outstandingSubscriptionAcks.isEmpty()) { - dittoProtocolSub.subscribe(streamingSessions.keySet(), authorizationContext.getAuthorizationSubjectIds(), + dittoProtocolSub.subscribe(streamingSessions.keySet(), + ReadSubjectExtractor.determineTopicsFor( + namespaces, + authorizationContext.getAuthorizationSubjects() + ), getSelf(), null, true); } startSubscriptionRefreshTimer(); diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/signals/Connect.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/signals/Connect.java index 09c3f9f945..3af4890330 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/signals/Connect.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/signals/Connect.java @@ -15,20 +15,20 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; import java.time.Instant; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; import javax.annotation.Nullable; +import org.apache.pekko.stream.KillSwitch; +import org.apache.pekko.stream.javadsl.SourceQueueWithComplete; import org.eclipse.ditto.base.model.acks.AcknowledgementLabel; import org.eclipse.ditto.base.model.auth.AuthorizationContext; import org.eclipse.ditto.base.model.json.JsonSchemaVersion; import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable; -import org.apache.pekko.stream.KillSwitch; -import org.apache.pekko.stream.javadsl.SourceQueueWithComplete; - /** * Message to be sent in order to establish a new "streaming" connection via {@link org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor}. */ @@ -41,6 +41,7 @@ public final class Connect { @Nullable private final Instant sessionExpirationTime; private final Set declaredAcknowledgementLabels; private final AuthorizationContext connectionAuthContext; + private final List namespaces; @Nullable private final KillSwitch killSwitch; /** @@ -53,6 +54,7 @@ public final class Connect { * @param sessionExpirationTime how long to keep the session alive when idling. * @param declaredAcknowledgementLabels labels of acknowledgements this session may send. * @param connectionAuthContext the authorizationContext of the streaming session. + * @param namespaces the namespaces to subscribe to in the streaming session (if already known). * @param killSwitch the kill switch to terminate the streaming session. */ public Connect(final SourceQueueWithComplete eventAndResponsePublisher, @@ -62,6 +64,7 @@ public Connect(final SourceQueueWithComplete eventAndRespo @Nullable final Instant sessionExpirationTime, final Set declaredAcknowledgementLabels, final AuthorizationContext connectionAuthContext, + final List namespaces, @Nullable final KillSwitch killSwitch) { this.eventAndResponsePublisher = eventAndResponsePublisher; this.connectionCorrelationId = checkNotNull(connectionCorrelationId, "connectionCorrelationId") @@ -71,6 +74,7 @@ public Connect(final SourceQueueWithComplete eventAndRespo this.sessionExpirationTime = sessionExpirationTime; this.declaredAcknowledgementLabels = declaredAcknowledgementLabels; this.connectionAuthContext = connectionAuthContext; + this.namespaces = namespaces; this.killSwitch = killSwitch; } @@ -102,6 +106,10 @@ public AuthorizationContext getConnectionAuthContext() { return connectionAuthContext; } + public List getNamespaces() { + return namespaces; + } + public Optional getKillSwitch() { return Optional.ofNullable(killSwitch); } @@ -121,13 +129,14 @@ public boolean equals(final Object o) { Objects.equals(sessionExpirationTime, connect.sessionExpirationTime) && Objects.equals(declaredAcknowledgementLabels, connect.declaredAcknowledgementLabels) && Objects.equals(connectionAuthContext, connect.connectionAuthContext) && + Objects.equals(namespaces, connect.namespaces) && Objects.equals(killSwitch, connect.killSwitch); } @Override public int hashCode() { return Objects.hash(eventAndResponsePublisher, connectionCorrelationId, type, sessionExpirationTime, - declaredAcknowledgementLabels, connectionAuthContext, killSwitch); + declaredAcknowledgementLabels, connectionAuthContext, namespaces, killSwitch); } @Override @@ -139,6 +148,7 @@ public String toString() { ", sessionExpirationTime=" + sessionExpirationTime + ", declaredAcknowledgementLabels=" + declaredAcknowledgementLabels + ", connectionAuthContext=" + connectionAuthContext + + ", namespaces=" + namespaces + ", killSwitch=" + killSwitch + "]"; } diff --git a/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorHeaderInteractionTest.java b/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorHeaderInteractionTest.java index 283d2c2990..5c8ee83ef2 100644 --- a/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorHeaderInteractionTest.java +++ b/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorHeaderInteractionTest.java @@ -25,6 +25,19 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import org.apache.pekko.actor.AbstractActor; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Props; +import org.apache.pekko.japi.pf.ReceiveBuilder; +import org.apache.pekko.stream.Attributes; +import org.apache.pekko.stream.OverflowStrategy; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.stream.javadsl.SourceQueueWithComplete; +import org.apache.pekko.testkit.TestProbe; +import org.apache.pekko.testkit.javadsl.TestKit; import org.eclipse.ditto.base.model.acks.AcknowledgementRequest; import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel; import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory; @@ -61,19 +74,6 @@ import com.typesafe.config.ConfigFactory; -import org.apache.pekko.actor.AbstractActor; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Props; -import org.apache.pekko.japi.pf.ReceiveBuilder; -import org.apache.pekko.stream.Attributes; -import org.apache.pekko.stream.OverflowStrategy; -import org.apache.pekko.stream.javadsl.Keep; -import org.apache.pekko.stream.javadsl.Sink; -import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.stream.javadsl.SourceQueueWithComplete; -import org.apache.pekko.testkit.TestProbe; -import org.apache.pekko.testkit.javadsl.TestKit; import scala.concurrent.duration.FiniteDuration; /** @@ -196,6 +196,7 @@ private ActorRef createStreamingSessionActor() { final Connect connect = new Connect(sourceQueue, "connectionCorrelationId", "ws", JsonSchemaVersion.V_2, null, Set.of(), AuthorizationModelFactory.emptyAuthContext(), + List.of(), null); final Props props = StreamingSessionActor.props(connect, dittoProtocolSub, commandRouterProbe.ref(), DefaultStreamingConfig.of(ConfigFactory.empty()), HeaderTranslator.empty(), diff --git a/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorTest.java b/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorTest.java index 9133c11cb3..02b9fdf56a 100644 --- a/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorTest.java +++ b/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActorTest.java @@ -433,6 +433,7 @@ private Connect getConnect(final Set declaredAcks) { null, declaredAcks, authorizationContext, + List.of(), killSwitch); } diff --git a/internal/utils/pubsub-things/src/main/java/org/eclipse/ditto/internal/utils/pubsubthings/DittoProtocolSub.java b/internal/utils/pubsub-things/src/main/java/org/eclipse/ditto/internal/utils/pubsubthings/DittoProtocolSub.java index a0692cf64e..477f8c7c74 100644 --- a/internal/utils/pubsub-things/src/main/java/org/eclipse/ditto/internal/utils/pubsubthings/DittoProtocolSub.java +++ b/internal/utils/pubsub-things/src/main/java/org/eclipse/ditto/internal/utils/pubsubthings/DittoProtocolSub.java @@ -19,12 +19,11 @@ import javax.annotation.Nullable; -import org.eclipse.ditto.base.model.acks.AcknowledgementLabel; -import org.eclipse.ditto.internal.utils.pubsub.StreamingType; - import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Extension; +import org.eclipse.ditto.base.model.acks.AcknowledgementLabel; +import org.eclipse.ditto.internal.utils.pubsub.StreamingType; /** * Subscriptions for Ditto protocol channels. @@ -55,7 +54,7 @@ default CompletionStage subscribe(Collection types, Collect * check for resubscriptions. */ CompletionStage subscribe(Collection types, Collection topics, ActorRef subscriber, - @Nullable String group, final boolean resubscribe); + @Nullable String group, boolean resubscribe); /** * Remove a subscriber. diff --git a/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/DistributedPub.java b/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/DistributedPub.java index 185e33083b..d002911473 100644 --- a/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/DistributedPub.java +++ b/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/DistributedPub.java @@ -14,12 +14,11 @@ import javax.annotation.Nullable; +import org.apache.pekko.actor.ActorRef; import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor; import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor; -import org.apache.pekko.actor.ActorRef; - /** * A jolly locale for the spreading of news. * @@ -42,7 +41,7 @@ public interface DistributedPub { * of a signal to ensure event order for each entity. * @return the wrapped message to send to the publisher. */ - Object wrapForPublication(T message, final CharSequence groupIndexKey); + Object wrapForPublication(T message, CharSequence groupIndexKey); /** * Wrap the message in an envelope to send to the publisher. @@ -53,7 +52,7 @@ public interface DistributedPub { * @param ackExtractor extractor of ack-related information from the message. * @return the wrapped message to send to the publisher. */ - Object wrapForPublicationWithAcks(S message, final CharSequence groupIndexKey, + Object wrapForPublicationWithAcks(S message, CharSequence groupIndexKey, AckExtractor ackExtractor); /** diff --git a/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/DistributedSub.java b/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/DistributedSub.java index 1dbe9bb3a2..e2ef4988be 100644 --- a/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/DistributedSub.java +++ b/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/DistributedSub.java @@ -18,11 +18,10 @@ import javax.annotation.Nullable; +import org.apache.pekko.actor.ActorRef; import org.eclipse.ditto.internal.utils.ddata.DistributedDataConfig; import org.eclipse.ditto.internal.utils.pubsub.api.SubAck; -import org.apache.pekko.actor.ActorRef; - /** * Access point for Ditto pub-sub subscribers. */ @@ -40,7 +39,7 @@ public interface DistributedSub { */ CompletionStage subscribeWithFilterAndGroup(Collection topics, ActorRef subscriber, @Nullable Predicate> filter, @Nullable String group, - final boolean resubscribe); + boolean resubscribe); /** * Unsubscribe for a collection of topics. diff --git a/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/extractors/ReadSubjectExtractor.java b/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/extractors/ReadSubjectExtractor.java index 7ad9eb660e..5a36e2b3c3 100644 --- a/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/extractors/ReadSubjectExtractor.java +++ b/internal/utils/pubsub/src/main/java/org/eclipse/ditto/internal/utils/pubsub/extractors/ReadSubjectExtractor.java @@ -15,8 +15,11 @@ import java.util.Collection; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.eclipse.ditto.base.model.auth.AuthorizationSubject; +import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId; +import org.eclipse.ditto.base.model.entity.id.WithEntityId; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; @@ -39,14 +42,55 @@ public static ReadSubjectExtractor of() { return new ReadSubjectExtractor<>(); } + /** + * Determines the pub/sub topics based on the provided {@code namespaces} and {@code authorizationSubjects}. + * + * @param namespaces the namespaces for which pub/sub topics should be determined for - may be empty if all + * namespaces are relevant and no scoping to namespaces should be done. + * @param authorizationSubjects the authorization subjects to subscribe for. + * @return a set of Ditto pub/sub topics with e.g. to subscribe for events in the cluster. + */ + public static Set determineTopicsFor( + final Collection namespaces, + final Collection authorizationSubjects + ) { + + final Set authorizationSubjectIds = authorizationSubjects.stream() + .map(AuthorizationSubject::getId) + .collect(Collectors.toSet()); + if (namespaces.isEmpty()) { + return authorizationSubjectIds; + } else { + return namespaces.stream().flatMap( + namespace -> combineNamespaceWithAuthSubjects(namespace, authorizationSubjectIds) + ).collect(Collectors.toSet()); + } + } + @Override public Collection getTopics(final T event) { final DittoHeaders dittoHeaders = event.getDittoHeaders(); final Set readGrantedSubjects = dittoHeaders.getReadGrantedSubjects(); - return readGrantedSubjects.stream() + final Set topicsWithoutNamespace = readGrantedSubjects.stream() .map(AuthorizationSubject::getId) .collect(Collectors.toSet()); + + if (event instanceof WithEntityId withEntityId && + withEntityId.getEntityId() instanceof NamespacedEntityId namespacedEntityId) { + final String namespace = namespacedEntityId.getNamespace(); + return Stream.concat( + combineNamespaceWithAuthSubjects(namespace, topicsWithoutNamespace), + topicsWithoutNamespace.stream() + ).collect(Collectors.toSet()); + } else { + return topicsWithoutNamespace; + } + } + + private static Stream combineNamespaceWithAuthSubjects(final String namespace, + final Set authorizationSubjectIds) { + return authorizationSubjectIds.stream().map(subject -> namespace + "#" + subject); } }