Skip to content

Commit

Permalink
[mqtt] avoid duplicates in listenerList; only (un)register topicd if …
Browse files Browse the repository at this point in the history
…listener in listenerList

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
  • Loading branch information
andrewfg committed Apr 25, 2021
1 parent 6d4b890 commit c0d00c9
Showing 1 changed file with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
*/
package org.openhab.binding.mqtt.internal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
Expand Down Expand Up @@ -58,7 +57,7 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements
.of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER)
.collect(Collectors.toSet());
private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class);
protected final Map<String, List<MQTTTopicDiscoveryParticipant>> discoveryTopics = new HashMap<>();
protected final Map<String, Set<MQTTTopicDiscoveryParticipant>> discoveryTopics = new HashMap<>();
protected final Set<AbstractBrokerHandler> handlers = Collections
.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));

Expand Down Expand Up @@ -113,11 +112,12 @@ protected void createdHandler(AbstractBrokerHandler handler) {
* a MQTT topic that is registered on all available broker connections.
*/
@Override
@SuppressWarnings("null")
public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) {
List<MQTTTopicDiscoveryParticipant> listenerList = discoveryTopics.computeIfAbsent(topic,
t -> new ArrayList<>());
listenerList.add(listener);
handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
Set<MQTTTopicDiscoveryParticipant> listenerList = discoveryTopics.computeIfAbsent(topic, t -> new HashSet<>());
if (listenerList.add(listener)) {
handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
}
}

/**
Expand All @@ -127,8 +127,9 @@ public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) {
@SuppressWarnings("null")
public void unsubscribe(MQTTTopicDiscoveryParticipant listener) {
discoveryTopics.forEach((topic, listenerList) -> {
listenerList.remove(listener);
handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
if (listenerList.remove(listener)) {
handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
}
});
}

Expand Down

0 comments on commit c0d00c9

Please sign in to comment.