Skip to content

Commit

Permalink
[mqtt] allow disabling discovery (openhab#8077)
Browse files Browse the repository at this point in the history
* allow disabling discovery

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
  • Loading branch information
J-N-K authored and andrewfg committed Aug 31, 2020
1 parent 108d0a3 commit 0651b30
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 22 deletions.
5 changes: 5 additions & 0 deletions bundles/org.openhab.binding.mqtt/README.md
Expand Up @@ -54,6 +54,11 @@ For more security, the following optional parameters can be altered:
* __certificate__: The certificate hash. If **certificatepin** is set this hash is used to verify the connection. Clear to allow a new certificate pinning on the next connection attempt. If empty will be filled automatically by the next successful connection. An example input would be `SHA-256:83F9171E06A313118889F7D79302BD1B7A2042EE0CFD029ABF8DD06FFA6CD9D3`.
* __publickey__: The public key hash. If **publickeypin** is set this hash is used to verify the connection. Clear to allow a new public key pinning on the next connection attempt. If empty will be filled automatically by the next successful connection. An example input would be `SHA-256:83F9171E06A313118889F7D79302BD1B7A2042EE0CFD029ABF8DD06FFA6CD9D3`.

By default discovery services (like homie or homeassistant) are enabled on a broker.
This behaviour can be controlled with a configuration parameter.

* __enableDiscovery__:If set to true, enables discovery on this broker, if set to false, disables discovery services on this broker.

## Supported Channels

You can extend your broker connection bridges with a channel:
Expand Down
Expand Up @@ -32,6 +32,8 @@ public class TopicSubscribe implements MqttMessageSubscriber {
final String topic;
final MQTTTopicDiscoveryParticipant topicDiscoveredListener;

private boolean isStarted = false;

/**
* Creates a {@link TopicSubscribe} object.
*
Expand Down Expand Up @@ -66,7 +68,10 @@ public void processMessage(String topic, byte[] payload) {
* @return Completes with true if successful. Completes with false if not connected yet. Exceptionally otherwise.
*/
public CompletableFuture<Boolean> start() {
return connection == null ? CompletableFuture.completedFuture(true) : connection.subscribe(topic, this);
CompletableFuture<Boolean> startFuture = connection == null ? CompletableFuture.completedFuture(true)
: connection.subscribe(topic, this);
isStarted = true;
return startFuture;
}

/**
Expand All @@ -75,6 +80,18 @@ public CompletableFuture<Boolean> start() {
* @return Completes with true if successful. Exceptionally otherwise.
*/
public CompletableFuture<Boolean> stop() {
return connection == null ? CompletableFuture.completedFuture(true) : connection.unsubscribe(topic, this);
CompletableFuture<Boolean> stopFuture = connection == null ? CompletableFuture.completedFuture(true)
: connection.unsubscribe(topic, this);
isStarted = false;
return stopFuture;
}

/**
* status of this topic subscription
*
* @return true if started
*/
public boolean isStarted() {
return isStarted;
}
}
Expand Up @@ -118,17 +118,23 @@ public void initialize() {

discoveryTopics.forEach((topic, listenerMap) -> {
listenerMap.replaceAll((listener, oldTopicSubscribe) -> {
if (oldTopicSubscribe.isStarted()) {
oldTopicSubscribe.stop();
}

TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
topicSubscribe.start().handle((result, ex) -> {
if (ex != null) {
logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
} else {
logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
}
return null;
});
if (discoveryEnabled()) {
topicSubscribe.start().handle((result, ex) -> {
if (ex != null) {
logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
} else {
logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
}
return null;
});
}
return topicSubscribe;
});
});
Expand Down Expand Up @@ -197,15 +203,18 @@ public final void registerDiscoveryListener(MQTTTopicDiscoveryParticipant listen
}

TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
topicSubscribe.start().handle((result, ex) -> {
if (ex != null) {
logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
} else {
logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic, thing.getUID());
}
return null;
});
if (discoveryEnabled()) {
topicSubscribe.start().handle((result, ex) -> {
if (ex != null) {
logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
} else {
logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
}
return null;
});
}
return topicSubscribe;
});
}
Expand Down Expand Up @@ -240,4 +249,11 @@ public final void unregisterDiscoveryListener(MQTTTopicDiscoveryParticipant list
return v.isEmpty() ? null : v;
});
}

/**
* check whether discovery is disabled on this broker
*
* @return true if discovery disabled
*/
public abstract boolean discoveryEnabled();
}
Expand Up @@ -123,6 +123,11 @@ public void dispose() {
super.dispose();
}

@Override
public boolean discoveryEnabled() {
return config.enableDiscovery;
}

/**
* Reads the thing configuration related to public key or certificate pinning, creates an appropriate a
* {@link PinningSSLContextProvider} and assigns it to the {@link MqttBrokerConnection} instance.
Expand Down
Expand Up @@ -34,4 +34,6 @@ public class BrokerHandlerConfig extends MqttBrokerConnectionConfig {
public boolean publickeypin = false;
public String certificate = "";
public String publickey = "";

public boolean enableDiscovery = true;
}
Expand Up @@ -50,6 +50,7 @@ public class SystemBrokerHandler extends AbstractBrokerHandler implements MqttSe
protected final MqttService service;

protected String brokerID = "";
protected boolean discoveryEnabled = true;

public SystemBrokerHandler(Bridge thing, MqttService service) {
super(thing);
Expand Down Expand Up @@ -116,6 +117,8 @@ public void brokerRemoved(String connectionName, MqttBrokerConnection removedCon
@Override
public void initialize() {
this.brokerID = getThing().getConfiguration().get("brokerid").toString();
this.discoveryEnabled = (Boolean) getThing().getConfiguration().get("enableDiscovery");

service.addBrokersListener(this);

connection = service.getBrokerConnection(brokerID);
Expand All @@ -132,4 +135,9 @@ public void dispose() {
service.removeBrokersListener(this);
super.dispose();
}

@Override
public boolean discoveryEnabled() {
return discoveryEnabled;
}
}
Expand Up @@ -155,6 +155,12 @@
`SHA-256:83F9171E06A313118889F7D79302BD1B7A2042EE0CFD029ABF8DD06FFA6CD9D3`</description>
<advanced>true</advanced>
</parameter>
<parameter name="enableDiscovery" type="boolean">
<label>Enable Discovery</label>
<description>If set to true enables this broker for all discovery services.</description>
<advanced>true</advanced>
<default>true</default>
</parameter>
</config-description>
</bridge-type>

Expand All @@ -181,6 +187,12 @@
<label>Broker ID</label>
<description>Each system wide configured MQTT broker has a unique broker ID.</description>
</parameter>
<parameter name="enableDiscovery" type="boolean">
<label>Enable Discovery</label>
<description>If set to true enables this broker for all discovery services.</description>
<advanced>true</advanced>
<default>true</default>
</parameter>
</config-description>
</bridge-type>

Expand Down
Expand Up @@ -104,7 +104,7 @@ public void firstSubscribeThenHandler() {
}

@Test
public void firstHandlerThanSubscribe() {
public void firstHandlerThenSubscribe() {
handler.initialize();
BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);

Expand Down

0 comments on commit 0651b30

Please sign in to comment.