Skip to content

Commit

Permalink
Don't subscribe to internal Ditto PubSub for connectivity announcemen…
Browse files Browse the repository at this point in the history
…ts as they are only sent outwards

Signed-off-by: Florian Fendt <Florian.Fendt@bosch.io>
  • Loading branch information
ffendt committed May 5, 2021
1 parent d66a421 commit 5b43125
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1761,14 +1761,21 @@ private CompletionStage<Void> subscribeAndDeclareAcknowledgementLabels() {
return CompletableFuture.completedFuture(null);
} else {
final String group = getPubsubGroup();
final CompletionStage<Void> subscribe =
dittoProtocolSub.subscribe(getUniqueStreamingTypes(), getTargetAuthSubjects(), getSelf(), group);
final CompletionStage<Void> subscribe = subscribeToStreamingTypes(group);
final CompletionStage<Void> declare =
dittoProtocolSub.declareAcknowledgementLabels(getDeclaredAcks(), getSelf(), group);
return declare.thenCompose(unused -> subscribe);
}
}

private CompletionStage<Void> subscribeToStreamingTypes(final String pubSubGroup) {
final Set<StreamingType> streamingTypes = getUniqueStreamingTypes();
if (streamingTypes.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return dittoProtocolSub.subscribe(streamingTypes, getTargetAuthSubjects(), getSelf(), pubSubGroup);
}

private Set<AcknowledgementLabel> getDeclaredAcks() {
return ConnectionValidator.getAcknowledgementLabelsToDeclare(connection).collect(Collectors.toSet());
}
Expand All @@ -1791,22 +1798,25 @@ private Set<StreamingType> getUniqueStreamingTypes() {
.flatMap(target -> target.getTopics().stream()
.map(FilteredTopic::getTopic)
.map(BaseClientActor::toStreamingTypes))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
}

private static StreamingType toStreamingTypes(final Topic topic) {
private static Optional<StreamingType> toStreamingTypes(final Topic topic) {
switch (topic) {
case POLICY_ANNOUNCEMENTS:
return StreamingType.POLICY_ANNOUNCEMENTS;
return Optional.of(StreamingType.POLICY_ANNOUNCEMENTS);
case LIVE_EVENTS:
return StreamingType.LIVE_EVENTS;
return Optional.of(StreamingType.LIVE_EVENTS);
case LIVE_COMMANDS:
return StreamingType.LIVE_COMMANDS;
return Optional.of(StreamingType.LIVE_COMMANDS);
case LIVE_MESSAGES:
return StreamingType.MESSAGES;
return Optional.of(StreamingType.MESSAGES);
case TWIN_EVENTS:
return Optional.of(StreamingType.EVENTS);
default:
return StreamingType.EVENTS;
return Optional.empty();
}
}

Expand Down
2 changes: 1 addition & 1 deletion thingsearch/service/src/main/resources/things-search.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ ditto {
}

things-search {
query-criteria-validator.implementation = ${?QUERY_CRITERIA_VALIDATOR_IMPLEMENTATION} // TODO ff also rename
query-criteria-validator.implementation = ${?QUERY_CRITERIA_VALIDATOR_IMPLEMENTATION}
mongo-hints-by-namespace = ${?MONGO_HINTS_BY_NAMESPACE}

index-initialization {
Expand Down

0 comments on commit 5b43125

Please sign in to comment.