Skip to content

Commit

Permalink
#1894 optimize Ditto internal pub/sub by adding subscribed for namesp…
Browse files Browse the repository at this point in the history
…aces to topic

* this can greatly reduce the amount of message dispatched internally when subscribing only for certain namespaces
  • Loading branch information
thjaeckle committed Feb 13, 2024
1 parent 593323b commit 3b6362b
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 44 deletions.
Expand Up @@ -74,7 +74,6 @@
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.FatalPubSubException;
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -146,6 +145,7 @@
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor;
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
Expand Down Expand Up @@ -181,7 +181,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private static final Pattern EXCLUDED_ADDRESS_REPORTING_CHILD_NAME_PATTERN = Pattern.compile(
OutboundMappingProcessorActor.ACTOR_NAME + "|" + OutboundDispatchingActor.ACTOR_NAME + "|" +
"ackr.*" + "|" + "StreamSupervisor-.*|" +
SubscriptionManager.ACTOR_NAME + "|" + StreamingSubscriptionManager.ACTOR_NAME);
SubscriptionManager.ACTOR_NAME + "|" + StreamingSubscriptionManager.ACTOR_NAME);
private static final String DITTO_STATE_TIMEOUT_TIMER = "dittoStateTimeout";

private static final int SOCKET_CHECK_TIMEOUT_MS = 2000;
Expand Down Expand Up @@ -330,7 +330,8 @@ protected void init() {
inboundMappingSink = getInboundMappingSink(protocolAdapter, inboundDispatchingSink);
subscriptionManager =
startSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());
streamingSubscriptionManager = startStreamingSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());
streamingSubscriptionManager = startStreamingSubscriptionManager(commandForwarderActorSelection,
connectivityConfig().getClientConfig());

if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) {
tunnelActor = childActorNanny.startChildActor(SshTunnelActor.ACTOR_NAME,
Expand Down Expand Up @@ -1924,7 +1925,8 @@ private ActorRef startSubscriptionManager(final ActorSelection proxyActor, final
*
* @return reference of the streaming subscription manager.
*/
private ActorRef startStreamingSubscriptionManager(final ActorSelection proxyActor, final ClientConfig clientConfig) {
private ActorRef startStreamingSubscriptionManager(final ActorSelection proxyActor,
final ClientConfig clientConfig) {
final var mat = Materializer.createMaterializer(this::getContext);
final var props = StreamingSubscriptionManager.props(clientConfig.getSubscriptionManagerTimeout(),
proxyActor, mat);
Expand Down Expand Up @@ -2098,9 +2100,14 @@ private String getPubsubGroup() {
private Set<String> getTargetAuthSubjects() {
return connection.getTargets()
.stream()
.map(Target::getAuthorizationContext)
.map(AuthorizationContext::getAuthorizationSubjectIds)
.flatMap(List::stream)
.map(target -> {
final Set<String> namespaces = target.getTopics().stream()
.flatMap(ft -> ft.getNamespaces().stream())
.collect(Collectors.toSet());
return ReadSubjectExtractor
.determineTopicsFor(namespaces, target.getAuthorizationContext().getAuthorizationSubjects());
})
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}

Expand Down
Expand Up @@ -297,7 +297,9 @@ private Route buildThingsSseRoute(final RequestContext ctx,
jsonPointerString -> {
if (INBOX_OUTBOX_PATTERN.matcher(jsonPointerString).matches()) {
return createMessagesSseRoute(ctx, dhcs, thingId,
jsonPointerString);
jsonPointerString,
getNamespaces(parameters.get(PARAM_NAMESPACES))
);
} else {
params.put(PARAM_FIELDS, jsonPointerString);
return createSseRoute(ctx, dhcs,
Expand Down Expand Up @@ -428,7 +430,7 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
connectionCorrelationId, dittoHeaders);
final var connect = new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
authorizationContext, null);
authorizationContext, namespaces, null);
Patterns.ask(streamingActor, connect, LOCAL_ASK_TIMEOUT)
.thenApply(ActorRef.class::cast)
.thenAccept(streamingSessionActor ->
Expand Down Expand Up @@ -459,7 +461,8 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
private Route createMessagesSseRoute(final RequestContext ctx,
final CompletionStage<DittoHeaders> dittoHeadersStage,
final String thingId,
final String messagePath) {
final String messagePath,
final List<String> namespaces) {

final List<ThingId> targetThingIds = List.of(ThingId.of(thingId));
final CompletionStage<SignalEnrichmentFacade> facadeStage = signalEnrichmentProvider == null
Expand Down Expand Up @@ -490,7 +493,7 @@ private Route createMessagesSseRoute(final RequestContext ctx,
final var connect =
new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
authorizationContext, null);
authorizationContext, namespaces, null);
final String resourcePathRqlStatement;
if (INBOX_OUTBOX_WITH_SUBJECT_PATTERN.matcher(messagePath).matches()) {
resourcePathRqlStatement = String.format(
Expand Down
Expand Up @@ -18,6 +18,7 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -575,7 +576,7 @@ private Pair<Connect, Flow<DittoRuntimeException, Message, NotUsed>> createOutgo
return new Connect(withQueue.getSourceQueue(), connectionCorrelationId, STREAMING_TYPE_WS,
version, optJsonWebToken.map(JsonWebToken::getExpirationTime).orElse(null),
readDeclaredAcknowledgementLabels(additionalHeaders), connectionAuthContext,
wsKillSwitch);
List.of(), wsKillSwitch);
})
.recoverWithRetries(1, new PFBuilder<Throwable, Source<SessionedJsonifiable, NotUsed>>()
.match(GatewayWebsocketSessionAbortedException.class,
Expand Down
Expand Up @@ -88,6 +88,7 @@
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor;
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
Expand Down Expand Up @@ -138,6 +139,7 @@ final class StreamingSessionActor extends AbstractActorWithTimers {
private final Set<AcknowledgementLabel> declaredAcks;
private final ThreadSafeDittoLoggingAdapter logger;
private AuthorizationContext authorizationContext;
private List<String> namespaces;

private Cancellable cancellableShutdownTask;
@Nullable private final KillSwitch killSwitch;
Expand All @@ -164,6 +166,7 @@ private StreamingSessionActor(final Connect connect,
this.jwtAuthenticationResultProvider = jwtAuthenticationResultProvider;
outstandingSubscriptionAcks = EnumSet.noneOf(StreamingType.class);
authorizationContext = connect.getConnectionAuthContext();
namespaces = connect.getNamespaces();
killSwitch = connect.getKillSwitch().orElse(null);
streamingSessions = new EnumMap<>(StreamingType.class);
ackregatorStarter = AcknowledgementAggregatorActorStarter.of(getContext(),
Expand Down Expand Up @@ -373,6 +376,7 @@ private Receive createPubSubBehavior() {
})
.match(StartStreaming.class, startStreaming -> {
authorizationContext = startStreaming.getAuthorizationContext();
namespaces = startStreaming.getNamespaces();
Criteria criteria;
try {
criteria = startStreaming.getFilter()
Expand All @@ -399,7 +403,10 @@ private Receive createPubSubBehavior() {
final var subscribeConfirmation = new ConfirmSubscription(startStreaming.getStreamingType());
final Collection<StreamingType> currentStreamingTypes = streamingSessions.keySet();
dittoProtocolSub.subscribe(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(),
ReadSubjectExtractor.determineTopicsFor(
startStreaming.getNamespaces(),
authorizationContext.getAuthorizationSubjects()
),
getSelf()
).whenComplete((ack, throwable) -> {
if (null == throwable) {
Expand Down Expand Up @@ -444,7 +451,12 @@ private Receive createPubSubBehavior() {
case LIVE_COMMANDS, LIVE_EVENTS, MESSAGES:
default:
dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(), getSelf())
ReadSubjectExtractor.determineTopicsFor(
namespaces,
authorizationContext.getAuthorizationSubjects()
),
getSelf()
)
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
}
})
Expand Down Expand Up @@ -817,7 +829,11 @@ private void startSubscriptionRefreshTimer() {

private void resubscribe(final Control trigger) {
if (!streamingSessions.isEmpty() && outstandingSubscriptionAcks.isEmpty()) {
dittoProtocolSub.subscribe(streamingSessions.keySet(), authorizationContext.getAuthorizationSubjectIds(),
dittoProtocolSub.subscribe(streamingSessions.keySet(),
ReadSubjectExtractor.determineTopicsFor(
namespaces,
authorizationContext.getAuthorizationSubjects()
),
getSelf(), null, true);
}
startSubscriptionRefreshTimer();
Expand Down
Expand Up @@ -15,20 +15,20 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import javax.annotation.Nullable;

import org.apache.pekko.stream.KillSwitch;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;

import org.apache.pekko.stream.KillSwitch;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;

/**
* Message to be sent in order to establish a new "streaming" connection via {@link org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor}.
*/
Expand All @@ -41,6 +41,7 @@ public final class Connect {
@Nullable private final Instant sessionExpirationTime;
private final Set<AcknowledgementLabel> declaredAcknowledgementLabels;
private final AuthorizationContext connectionAuthContext;
private final List<String> namespaces;
@Nullable private final KillSwitch killSwitch;

/**
Expand All @@ -53,6 +54,7 @@ public final class Connect {
* @param sessionExpirationTime how long to keep the session alive when idling.
* @param declaredAcknowledgementLabels labels of acknowledgements this session may send.
* @param connectionAuthContext the authorizationContext of the streaming session.
* @param namespaces the namespaces to subscribe to in the streaming session (if already known).
* @param killSwitch the kill switch to terminate the streaming session.
*/
public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndResponsePublisher,
Expand All @@ -62,6 +64,7 @@ public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndRespo
@Nullable final Instant sessionExpirationTime,
final Set<AcknowledgementLabel> declaredAcknowledgementLabels,
final AuthorizationContext connectionAuthContext,
final List<String> namespaces,
@Nullable final KillSwitch killSwitch) {
this.eventAndResponsePublisher = eventAndResponsePublisher;
this.connectionCorrelationId = checkNotNull(connectionCorrelationId, "connectionCorrelationId")
Expand All @@ -71,6 +74,7 @@ public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndRespo
this.sessionExpirationTime = sessionExpirationTime;
this.declaredAcknowledgementLabels = declaredAcknowledgementLabels;
this.connectionAuthContext = connectionAuthContext;
this.namespaces = namespaces;
this.killSwitch = killSwitch;
}

Expand Down Expand Up @@ -102,6 +106,10 @@ public AuthorizationContext getConnectionAuthContext() {
return connectionAuthContext;
}

public List<String> getNamespaces() {
return namespaces;
}

public Optional<KillSwitch> getKillSwitch() {
return Optional.ofNullable(killSwitch);
}
Expand All @@ -121,13 +129,14 @@ public boolean equals(final Object o) {
Objects.equals(sessionExpirationTime, connect.sessionExpirationTime) &&
Objects.equals(declaredAcknowledgementLabels, connect.declaredAcknowledgementLabels) &&
Objects.equals(connectionAuthContext, connect.connectionAuthContext) &&
Objects.equals(namespaces, connect.namespaces) &&
Objects.equals(killSwitch, connect.killSwitch);
}

@Override
public int hashCode() {
return Objects.hash(eventAndResponsePublisher, connectionCorrelationId, type, sessionExpirationTime,
declaredAcknowledgementLabels, connectionAuthContext, killSwitch);
declaredAcknowledgementLabels, connectionAuthContext, namespaces, killSwitch);
}

@Override
Expand All @@ -139,6 +148,7 @@ public String toString() {
", sessionExpirationTime=" + sessionExpirationTime +
", declaredAcknowledgementLabels=" + declaredAcknowledgementLabels +
", connectionAuthContext=" + connectionAuthContext +
", namespaces=" + namespaces +
", killSwitch=" + killSwitch +
"]";
}
Expand Down
Expand Up @@ -25,6 +25,19 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory;
Expand Down Expand Up @@ -61,19 +74,6 @@

import com.typesafe.config.ConfigFactory;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.testkit.javadsl.TestKit;
import scala.concurrent.duration.FiniteDuration;

/**
Expand Down Expand Up @@ -196,6 +196,7 @@ private ActorRef createStreamingSessionActor() {
final Connect connect =
new Connect(sourceQueue, "connectionCorrelationId", "ws",
JsonSchemaVersion.V_2, null, Set.of(), AuthorizationModelFactory.emptyAuthContext(),
List.of(),
null);
final Props props = StreamingSessionActor.props(connect, dittoProtocolSub, commandRouterProbe.ref(),
DefaultStreamingConfig.of(ConfigFactory.empty()), HeaderTranslator.empty(),
Expand Down
Expand Up @@ -433,6 +433,7 @@ private Connect getConnect(final Set<AcknowledgementLabel> declaredAcks) {
null,
declaredAcks,
authorizationContext,
List.of(),
killSwitch);
}

Expand Down
Expand Up @@ -19,12 +19,11 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Extension;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;

/**
* Subscriptions for Ditto protocol channels.
Expand Down Expand Up @@ -55,7 +54,7 @@ default CompletionStage<Void> subscribe(Collection<StreamingType> types, Collect
* check for resubscriptions.
*/
CompletionStage<Boolean> subscribe(Collection<StreamingType> types, Collection<String> topics, ActorRef subscriber,
@Nullable String group, final boolean resubscribe);
@Nullable String group, boolean resubscribe);

/**
* Remove a subscriber.
Expand Down

0 comments on commit 3b6362b

Please sign in to comment.