Skip to content

Commit

Permalink
#1894 optimize Ditto internal pub/sub by adding subscribed for namesp…
Browse files Browse the repository at this point in the history
…aces to topic

* this can greatly reduce the amount of message dispatched internally when subscribing only for certain namespaces
  • Loading branch information
thjaeckle committed Feb 9, 2024
1 parent 4a6011b commit 24e1701
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.FatalPubSubException;
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -146,6 +145,7 @@
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor;
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
Expand Down Expand Up @@ -181,7 +181,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private static final Pattern EXCLUDED_ADDRESS_REPORTING_CHILD_NAME_PATTERN = Pattern.compile(
OutboundMappingProcessorActor.ACTOR_NAME + "|" + OutboundDispatchingActor.ACTOR_NAME + "|" +
"ackr.*" + "|" + "StreamSupervisor-.*|" +
SubscriptionManager.ACTOR_NAME + "|" + StreamingSubscriptionManager.ACTOR_NAME);
SubscriptionManager.ACTOR_NAME + "|" + StreamingSubscriptionManager.ACTOR_NAME);
private static final String DITTO_STATE_TIMEOUT_TIMER = "dittoStateTimeout";

private static final int SOCKET_CHECK_TIMEOUT_MS = 2000;
Expand Down Expand Up @@ -330,7 +330,8 @@ protected void init() {
inboundMappingSink = getInboundMappingSink(protocolAdapter, inboundDispatchingSink);
subscriptionManager =
startSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());
streamingSubscriptionManager = startStreamingSubscriptionManager(commandForwarderActorSelection, connectivityConfig().getClientConfig());
streamingSubscriptionManager = startStreamingSubscriptionManager(commandForwarderActorSelection,
connectivityConfig().getClientConfig());

if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) {
tunnelActor = childActorNanny.startChildActor(SshTunnelActor.ACTOR_NAME,
Expand Down Expand Up @@ -1924,7 +1925,8 @@ private ActorRef startSubscriptionManager(final ActorSelection proxyActor, final
*
* @return reference of the streaming subscription manager.
*/
private ActorRef startStreamingSubscriptionManager(final ActorSelection proxyActor, final ClientConfig clientConfig) {
private ActorRef startStreamingSubscriptionManager(final ActorSelection proxyActor,
final ClientConfig clientConfig) {
final var mat = Materializer.createMaterializer(this::getContext);
final var props = StreamingSubscriptionManager.props(clientConfig.getSubscriptionManagerTimeout(),
proxyActor, mat);
Expand Down Expand Up @@ -2098,9 +2100,14 @@ private String getPubsubGroup() {
private Set<String> getTargetAuthSubjects() {
return connection.getTargets()
.stream()
.map(Target::getAuthorizationContext)
.map(AuthorizationContext::getAuthorizationSubjectIds)
.flatMap(List::stream)
.map(target -> {
final Set<String> namespaces = target.getTopics().stream()
.flatMap(ft -> ft.getNamespaces().stream())
.collect(Collectors.toSet());
return ReadSubjectExtractor
.determineTopicsFor(namespaces, target.getAuthorizationContext().getAuthorizationSubjects());
})
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ private Route buildThingsSseRoute(final RequestContext ctx,
jsonPointerString -> {
if (INBOX_OUTBOX_PATTERN.matcher(jsonPointerString).matches()) {
return createMessagesSseRoute(ctx, dhcs, thingId,
jsonPointerString);
jsonPointerString,
getNamespaces(parameters.get(PARAM_NAMESPACES))
);
} else {
params.put(PARAM_FIELDS, jsonPointerString);
return createSseRoute(ctx, dhcs,
Expand Down Expand Up @@ -428,7 +430,7 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
connectionCorrelationId, dittoHeaders);
final var connect = new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
authorizationContext, null);
authorizationContext, namespaces, null);
Patterns.ask(streamingActor, connect, LOCAL_ASK_TIMEOUT)
.thenApply(ActorRef.class::cast)
.thenAccept(streamingSessionActor ->
Expand Down Expand Up @@ -459,7 +461,8 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
private Route createMessagesSseRoute(final RequestContext ctx,
final CompletionStage<DittoHeaders> dittoHeadersStage,
final String thingId,
final String messagePath) {
final String messagePath,
final List<String> namespaces) {

final List<ThingId> targetThingIds = List.of(ThingId.of(thingId));
final CompletionStage<SignalEnrichmentFacade> facadeStage = signalEnrichmentProvider == null
Expand Down Expand Up @@ -490,7 +493,7 @@ private Route createMessagesSseRoute(final RequestContext ctx,
final var connect =
new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
authorizationContext, null);
authorizationContext, namespaces, null);
final String resourcePathRqlStatement;
if (INBOX_OUTBOX_WITH_SUBJECT_PATTERN.matcher(messagePath).matches()) {
resourcePathRqlStatement = String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -575,7 +576,7 @@ private Pair<Connect, Flow<DittoRuntimeException, Message, NotUsed>> createOutgo
return new Connect(withQueue.getSourceQueue(), connectionCorrelationId, STREAMING_TYPE_WS,
version, optJsonWebToken.map(JsonWebToken::getExpirationTime).orElse(null),
readDeclaredAcknowledgementLabels(additionalHeaders), connectionAuthContext,
wsKillSwitch);
List.of(), wsKillSwitch);
})
.recoverWithRetries(1, new PFBuilder<Throwable, Source<SessionedJsonifiable, NotUsed>>()
.match(GatewayWebsocketSessionAbortedException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.pubsub.extractors.ReadSubjectExtractor;
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
Expand Down Expand Up @@ -138,6 +139,7 @@ final class StreamingSessionActor extends AbstractActorWithTimers {
private final Set<AcknowledgementLabel> declaredAcks;
private final ThreadSafeDittoLoggingAdapter logger;
private AuthorizationContext authorizationContext;
private List<String> namespaces;

private Cancellable cancellableShutdownTask;
@Nullable private final KillSwitch killSwitch;
Expand All @@ -164,6 +166,7 @@ private StreamingSessionActor(final Connect connect,
this.jwtAuthenticationResultProvider = jwtAuthenticationResultProvider;
outstandingSubscriptionAcks = EnumSet.noneOf(StreamingType.class);
authorizationContext = connect.getConnectionAuthContext();
namespaces = connect.getNamespaces();
killSwitch = connect.getKillSwitch().orElse(null);
streamingSessions = new EnumMap<>(StreamingType.class);
ackregatorStarter = AcknowledgementAggregatorActorStarter.of(getContext(),
Expand Down Expand Up @@ -373,6 +376,7 @@ private Receive createPubSubBehavior() {
})
.match(StartStreaming.class, startStreaming -> {
authorizationContext = startStreaming.getAuthorizationContext();
namespaces = startStreaming.getNamespaces();
Criteria criteria;
try {
criteria = startStreaming.getFilter()
Expand All @@ -399,7 +403,10 @@ private Receive createPubSubBehavior() {
final var subscribeConfirmation = new ConfirmSubscription(startStreaming.getStreamingType());
final Collection<StreamingType> currentStreamingTypes = streamingSessions.keySet();
dittoProtocolSub.subscribe(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(),
ReadSubjectExtractor.determineTopicsFor(
startStreaming.getNamespaces(),
authorizationContext.getAuthorizationSubjects()
),
getSelf()
).whenComplete((ack, throwable) -> {
if (null == throwable) {
Expand Down Expand Up @@ -444,7 +451,12 @@ private Receive createPubSubBehavior() {
case LIVE_COMMANDS, LIVE_EVENTS, MESSAGES:
default:
dittoProtocolSub.updateLiveSubscriptions(currentStreamingTypes,
authorizationContext.getAuthorizationSubjectIds(), getSelf())
ReadSubjectExtractor.determineTopicsFor(
namespaces,
authorizationContext.getAuthorizationSubjects()
),
getSelf()
)
.thenAccept(ack -> getSelf().tell(unsubscribeConfirmation, getSelf()));
}
})
Expand Down Expand Up @@ -817,7 +829,11 @@ private void startSubscriptionRefreshTimer() {

private void resubscribe(final Control trigger) {
if (!streamingSessions.isEmpty() && outstandingSubscriptionAcks.isEmpty()) {
dittoProtocolSub.subscribe(streamingSessions.keySet(), authorizationContext.getAuthorizationSubjectIds(),
dittoProtocolSub.subscribe(streamingSessions.keySet(),
ReadSubjectExtractor.determineTopicsFor(
namespaces,
authorizationContext.getAuthorizationSubjects()
),
getSelf(), null, true);
}
startSubscriptionRefreshTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import javax.annotation.Nullable;

import org.apache.pekko.stream.KillSwitch;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;

import org.apache.pekko.stream.KillSwitch;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;

/**
* Message to be sent in order to establish a new "streaming" connection via {@link org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor}.
*/
Expand All @@ -41,6 +41,7 @@ public final class Connect {
@Nullable private final Instant sessionExpirationTime;
private final Set<AcknowledgementLabel> declaredAcknowledgementLabels;
private final AuthorizationContext connectionAuthContext;
private final List<String> namespaces;
@Nullable private final KillSwitch killSwitch;

/**
Expand All @@ -53,6 +54,7 @@ public final class Connect {
* @param sessionExpirationTime how long to keep the session alive when idling.
* @param declaredAcknowledgementLabels labels of acknowledgements this session may send.
* @param connectionAuthContext the authorizationContext of the streaming session.
* @param namespaces TODO TJ doc
* @param killSwitch the kill switch to terminate the streaming session.
*/
public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndResponsePublisher,
Expand All @@ -62,6 +64,7 @@ public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndRespo
@Nullable final Instant sessionExpirationTime,
final Set<AcknowledgementLabel> declaredAcknowledgementLabels,
final AuthorizationContext connectionAuthContext,
final List<String> namespaces,
@Nullable final KillSwitch killSwitch) {
this.eventAndResponsePublisher = eventAndResponsePublisher;
this.connectionCorrelationId = checkNotNull(connectionCorrelationId, "connectionCorrelationId")
Expand All @@ -71,6 +74,7 @@ public Connect(final SourceQueueWithComplete<SessionedJsonifiable> eventAndRespo
this.sessionExpirationTime = sessionExpirationTime;
this.declaredAcknowledgementLabels = declaredAcknowledgementLabels;
this.connectionAuthContext = connectionAuthContext;
this.namespaces = namespaces;
this.killSwitch = killSwitch;
}

Expand Down Expand Up @@ -102,6 +106,10 @@ public AuthorizationContext getConnectionAuthContext() {
return connectionAuthContext;
}

public List<String> getNamespaces() {
return namespaces;
}

public Optional<KillSwitch> getKillSwitch() {
return Optional.ofNullable(killSwitch);
}
Expand All @@ -121,13 +129,14 @@ public boolean equals(final Object o) {
Objects.equals(sessionExpirationTime, connect.sessionExpirationTime) &&
Objects.equals(declaredAcknowledgementLabels, connect.declaredAcknowledgementLabels) &&
Objects.equals(connectionAuthContext, connect.connectionAuthContext) &&
Objects.equals(namespaces, connect.namespaces) &&
Objects.equals(killSwitch, connect.killSwitch);
}

@Override
public int hashCode() {
return Objects.hash(eventAndResponsePublisher, connectionCorrelationId, type, sessionExpirationTime,
declaredAcknowledgementLabels, connectionAuthContext, killSwitch);
declaredAcknowledgementLabels, connectionAuthContext, namespaces, killSwitch);
}

@Override
Expand All @@ -139,6 +148,7 @@ public String toString() {
", sessionExpirationTime=" + sessionExpirationTime +
", declaredAcknowledgementLabels=" + declaredAcknowledgementLabels +
", connectionAuthContext=" + connectionAuthContext +
", namespaces=" + namespaces +
", killSwitch=" + killSwitch +
"]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory;
Expand Down Expand Up @@ -61,19 +74,6 @@

import com.typesafe.config.ConfigFactory;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.testkit.javadsl.TestKit;
import scala.concurrent.duration.FiniteDuration;

/**
Expand Down Expand Up @@ -196,6 +196,7 @@ private ActorRef createStreamingSessionActor() {
final Connect connect =
new Connect(sourceQueue, "connectionCorrelationId", "ws",
JsonSchemaVersion.V_2, null, Set.of(), AuthorizationModelFactory.emptyAuthContext(),
List.of(),
null);
final Props props = StreamingSessionActor.props(connect, dittoProtocolSub, commandRouterProbe.ref(),
DefaultStreamingConfig.of(ConfigFactory.empty()), HeaderTranslator.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ private Connect getConnect(final Set<AcknowledgementLabel> declaredAcks) {
null,
declaredAcks,
authorizationContext,
List.of(),
killSwitch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Extension;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;

/**
* Subscriptions for Ditto protocol channels.
Expand Down Expand Up @@ -55,7 +54,7 @@ default CompletionStage<Void> subscribe(Collection<StreamingType> types, Collect
* check for resubscriptions.
*/
CompletionStage<Boolean> subscribe(Collection<StreamingType> types, Collection<String> topics, ActorRef subscriber,
@Nullable String group, final boolean resubscribe);
@Nullable String group, boolean resubscribe);

/**
* Remove a subscriber.
Expand Down

0 comments on commit 24e1701

Please sign in to comment.