Skip to content

Commit

Permalink
unsubscribe from registered topics (not only thing events)
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Apr 13, 2018
1 parent cfa48f0 commit 1b59a96
Showing 1 changed file with 19 additions and 14 deletions.
Expand Up @@ -69,7 +69,6 @@
import org.eclipse.ditto.signals.events.connectivity.ConnectionCreated;
import org.eclipse.ditto.signals.events.connectivity.ConnectionDeleted;
import org.eclipse.ditto.signals.events.connectivity.ConnectionOpened;
import org.eclipse.ditto.signals.events.things.ThingEvent;

import com.typesafe.config.Config;

Expand Down Expand Up @@ -133,7 +132,7 @@ final class ConnectionActor extends AbstractPersistentActor {
private long lastSnapshotSequenceNr = -1L;
private boolean snapshotInProgress = false;

private Set<String> uniqueTopicPaths;
private Set<String> uniqueTopicPaths = Collections.emptySet();

private ConnectionActor(final String connectionId, final ActorRef pubSubMediator,
final ConnectionActorPropsFactory propsFactory) {
Expand Down Expand Up @@ -509,25 +508,31 @@ private void subscribeForEvents() {
.flatMap(target -> target.getTopics().stream())
.collect(Collectors.toSet());

final Set<String> pubSubTopics = uniqueTopicPaths.stream()
.map(TopicPathMapper::mapToPubSubTopic)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());

pubSubTopics.forEach(pubSubTopic -> {
forEachPubSubTopicDo(pubSubTopic -> {
final DistributedPubSubMediator.Subscribe subscribe =
new DistributedPubSubMediator.Subscribe(pubSubTopic, PUB_SUB_GROUP_PREFIX + connectionId,
getSelf());
log.info("Subscribing to pubsub topic '{}' for connection '{}'.", pubSubTopic, connectionId);
log.debug("Subscribing to pubsub topic '{}' for connection '{}'.", pubSubTopic, connectionId);
pubSubMediator.tell(subscribe, getSelf());
});
}

private void unsubscribeFromEvents() {
pubSubMediator.tell(
new DistributedPubSubMediator.Unsubscribe(ThingEvent.TYPE_PREFIX, PUB_SUB_GROUP_PREFIX + connectionId,
getSelf()), getSelf());
forEachPubSubTopicDo(pubSubTopic -> {
log.debug("Unsubscribing from pubsub topic '{}' for connection '{}'.", pubSubTopic, connectionId);
final DistributedPubSubMediator.Unsubscribe unsubscribe =
new DistributedPubSubMediator.Unsubscribe(pubSubTopic, PUB_SUB_GROUP_PREFIX + connectionId,
getSelf());
pubSubMediator.tell(unsubscribe, getSelf());
});
}

private void forEachPubSubTopicDo(final Consumer<String> topicConsumer) {
uniqueTopicPaths.stream()
.map(TopicPathMapper::mapToPubSubTopic)
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(topicConsumer);
}

private void handleCommandDuringInitialization(final ConnectivityCommand command) {
Expand Down Expand Up @@ -697,7 +702,7 @@ public Receive createReceive() {
aggregatedResults.add((CommandResponse<?>) any);
} else if (any instanceof Status.Status) {
aggregatedStatus.put(getSender().path().address().hostPort(), (Status.Status) any);
} else if (any instanceof DittoRuntimeException){
} else if (any instanceof DittoRuntimeException) {
aggregatedResults.add(ConnectivityErrorResponse.of((DittoRuntimeException) any));
} else {
log.error("Could not handle non-Jsonifiable non-Status response: {}", any);
Expand Down

0 comments on commit 1b59a96

Please sign in to comment.