From 6715e7a9798b0f048164bea9b3aad682bb7f7d6c Mon Sep 17 00:00:00 2001 From: Andrew Fiddian-Green Date: Sun, 8 Aug 2021 13:18:01 +0100 Subject: [PATCH] [mqtt] Discovery services shall not unsubscribe unless they have already subscribed (#10566) * [mqqt] do not allow unsubscribe unless already subscribed Signed-off-by: Andrew Fiddian-Green --- .../mqtt/discovery/AbstractMQTTDiscovery.java | 30 +++++++++-- .../internal/MqttBrokerHandlerFactory.java | 51 +++++++++++++------ 2 files changed, 61 insertions(+), 20 deletions(-) diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java index 5b83320b1de9e..0732387a221ff 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java @@ -16,6 +16,7 @@ import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -48,11 +49,32 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp private @Nullable ScheduledFuture scheduledStop; + private AtomicBoolean isSubscribed; + public AbstractMQTTDiscovery(@Nullable Set supportedThingTypes, int timeout, boolean backgroundDiscoveryEnabledByDefault, String baseTopic) { super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault); this.subscribeTopic = baseTopic; this.timeout = timeout; + isSubscribed = new AtomicBoolean(false); + } + + /** + * Only subscribe if we were not already subscribed + */ + private void subscribe() { + if (!isSubscribed.getAndSet(true)) { + getDiscoveryService().subscribe(this, subscribeTopic); + } + } + + /** + * Only unsubscribe if we were already subscribed + */ + private void unSubscribe() { + if (isSubscribed.getAndSet(false)) { + getDiscoveryService().unsubscribe(this); + } } /** @@ -94,7 +116,7 @@ protected void startScan() { return; } resetTimeout(); - getDiscoveryService().subscribe(this, subscribeTopic); + subscribe(); } @Override @@ -104,7 +126,7 @@ protected synchronized void stopScan() { return; } stopTimeout(); - getDiscoveryService().unsubscribe(this); + unSubscribe(); super.stopScan(); } @@ -118,11 +140,11 @@ public synchronized void abortScan() { protected void startBackgroundDiscovery() { // Remove results that are restored after a restart removeOlderResults(new Date().getTime()); - getDiscoveryService().subscribe(this, subscribeTopic); + subscribe(); } @Override protected void stopBackgroundDiscovery() { - getDiscoveryService().unsubscribe(this); + unSubscribe(); } } diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java index 5a8eec82b3ef1..491f6e1b12ce6 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java @@ -12,13 +12,11 @@ */ package org.openhab.binding.mqtt.internal; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -54,11 +52,22 @@ @Component(service = { ThingHandlerFactory.class, MQTTTopicDiscoveryService.class }, configurationPid = "MqttBrokerHandlerFactory") public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements MQTTTopicDiscoveryService { + private static final Set SUPPORTED_THING_TYPES_UIDS = Stream .of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER) .collect(Collectors.toSet()); + private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class); - protected final Map> discoveryTopics = new HashMap<>(); + + /** + * This Map provides a lookup between a Topic string (key) and a Set of MQTTTopicDiscoveryParticipants (value), + * where the Set itself is a list of participants which are subscribed to the respective Topic. + */ + protected final Map> discoveryTopics = new ConcurrentHashMap<>(); + + /** + * This Set contains a list of all the Broker handlers that have been created by this factory + */ protected final Set handlers = Collections .synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>())); @@ -75,12 +84,13 @@ public boolean supportsThingType(ThingTypeUID thingTypeUID) { } /** - * Add the given broker connection to all listeners. + * Add the given broker handler to the list of known handlers. And then iterate over all topics and their respective + * list of listeners, and register the respective new listener and topic with the given new broker handler. */ protected void createdHandler(AbstractBrokerHandler handler) { handlers.add(handler); - discoveryTopics.forEach((topic, listenerList) -> { - listenerList.forEach(listener -> { + discoveryTopics.forEach((topic, listeners) -> { + listeners.forEach(listener -> { handler.registerDiscoveryListener(listener, topic); }); }); @@ -111,24 +121,33 @@ protected void createdHandler(AbstractBrokerHandler handler) { /** * This factory also implements {@link MQTTTopicDiscoveryService} so consumers can subscribe to * a MQTT topic that is registered on all available broker connections. + * + * Checks each topic, and if the listener is not already in the listener list for that topic, adds itself from that + * list, and registers itself and the respective topic with all the known brokers. */ @Override + @SuppressWarnings("null") public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) { - List listenerList = discoveryTopics.computeIfAbsent(topic, - t -> new ArrayList<>()); - listenerList.add(listener); - handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic)); + Set listeners = discoveryTopics.computeIfAbsent(topic, + t -> ConcurrentHashMap.newKeySet()); + if (listeners.add(listener)) { + handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic)); + } } /** - * Unsubscribe a listener from all available broker connections. + * This factory also implements {@link MQTTTopicDiscoveryService} so consumers can unsubscribe from + * a MQTT topic that is registered on all available broker connections. + * + * Checks each topic, and if the listener is in the listener list for that topic, removes itself from that list, and + * unregisters itself and the respective topic from all the known brokers. */ @Override - @SuppressWarnings("null") public void unsubscribe(MQTTTopicDiscoveryParticipant listener) { - discoveryTopics.forEach((topic, listenerList) -> { - listenerList.remove(listener); - handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic)); + discoveryTopics.forEach((topic, listeners) -> { + if (listeners.remove(listener)) { + handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic)); + } }); }