This repository has been archived by the owner on May 7, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 787
MQTT last will and testament #5231
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
*/ | ||
package org.eclipse.smarthome.io.transport.mqtt; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
|
@@ -20,6 +21,7 @@ | |
import java.util.Map.Entry; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.stream.Collectors; | ||
|
||
import javax.naming.ConfigurationException; | ||
|
||
|
@@ -51,6 +53,70 @@ | |
"service.pid=org.eclipse.smarthome.mqtt" }) | ||
@NonNullByDefault | ||
public class MqttService { | ||
public static class Config { | ||
public final @Nullable String name; | ||
public final @Nullable String url; | ||
public final @Nullable String user; | ||
public final @Nullable String pwd; | ||
public final @Nullable String clientId; | ||
public final @Nullable Integer keepAlive; | ||
public final @Nullable Integer qos; | ||
public final @Nullable Boolean retain; | ||
public final @Nullable String lwt; | ||
public final @Nullable String lwtTopic; | ||
public final byte @Nullable [] lwtMessage; | ||
public final @Nullable Integer lwtQos; | ||
public final @Nullable Boolean lwtRetain; | ||
|
||
public Config(final Map<String, String> cfg) { | ||
name = cfg.get(NAME_PROPERTY); | ||
url = cfg.get("url"); | ||
user = cfg.get("user"); | ||
pwd = cfg.get("pwd"); | ||
clientId = cfg.get("clientId"); | ||
keepAlive = asInt(cfg.get("keepAlive")); | ||
qos = asInt(cfg.get("qos")); | ||
retain = asBool(cfg.get("retain")); | ||
|
||
// Check for Last Will and Testament | ||
lwt = cfg.get("lwt"); | ||
|
||
// Inspect explicit given LWT values | ||
lwtTopic = cfg.get("lwtTopic"); | ||
String tmp = cfg.get("lwtMessage"); | ||
if (tmp != null) { | ||
lwtMessage = tmp.getBytes(StandardCharsets.UTF_8); | ||
} else { | ||
lwtMessage = null; | ||
} | ||
Integer tmpInt = asInt(cfg.get("lwtQos")); | ||
if (tmpInt != null && tmpInt >= 0 && tmpInt <= 2) { | ||
lwtQos = tmpInt; | ||
} else { | ||
lwtQos = null; | ||
} | ||
lwtRetain = asBool(cfg.get("lwtRetain")); | ||
} | ||
|
||
private static @Nullable Integer asInt(final @Nullable String value) { | ||
if (value == null || value.isEmpty()) { | ||
return null; | ||
} | ||
try { | ||
return Integer.valueOf(value); | ||
} catch (final NumberFormatException ex) { | ||
return null; | ||
} | ||
} | ||
|
||
private static @Nullable Boolean asBool(final @Nullable String value) { | ||
if (value == null || value.isEmpty()) { | ||
return null; | ||
} | ||
return Boolean.valueOf(value); | ||
} | ||
} | ||
|
||
private static final String NAME_PROPERTY = "name"; | ||
private final Logger logger = LoggerFactory.getLogger(MqttService.class); | ||
private final Map<String, MqttBrokerConnection> brokerConnections = new ConcurrentHashMap<String, MqttBrokerConnection>(); | ||
|
@@ -71,7 +137,7 @@ public class MqttService { | |
* @param properties Service configuration | ||
* @return A 'list' of broker configurations as key-value maps. A configuration map at least contains a "name". | ||
*/ | ||
public Map<String, Map<String, String>> extractBrokerConfigurations(Map<String, Object> properties) { | ||
public Map<String, Config> extractBrokerConfigurations(Map<String, Object> properties) { | ||
Map<String, Map<String, String>> configPerBroker = new HashMap<String, Map<String, String>>(); | ||
for (Entry<String, Object> entry : properties.entrySet()) { | ||
String key = entry.getKey(); | ||
|
@@ -106,7 +172,8 @@ public Map<String, Map<String, String>> extractBrokerConfigurations(Map<String, | |
brokerConfig.put(subkeys[1], value); | ||
} | ||
|
||
return configPerBroker; | ||
return configPerBroker.entrySet().stream() | ||
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> new Config(entry.getValue()))); | ||
} | ||
|
||
/** | ||
|
@@ -133,9 +200,9 @@ public void modified(Map<String, Object> config) { | |
return; | ||
} | ||
|
||
Map<String, Map<String, String>> brokerConfigs = extractBrokerConfigurations(config); | ||
Map<String, Config> brokerConfigs = extractBrokerConfigurations(config); | ||
|
||
for (Map<String, String> brokerConfig : brokerConfigs.values()) { | ||
for (Config brokerConfig : brokerConfigs.values()) { | ||
try { | ||
final MqttBrokerConnection conn = addBrokerConnection(brokerConfig); | ||
if (conn == null) { | ||
|
@@ -228,28 +295,31 @@ public boolean addBrokerConnection(MqttBrokerConnection connection) { | |
} | ||
|
||
/** | ||
* Add a broker by a configuration key-value map. You need to provide at least a "name" and an "url". | ||
* Add a broker by a configuration. | ||
* | ||
* <p> | ||
* You need to provide at least a "name" and an "url". | ||
* Additional properties are "user","pwd","qos","retain","lwt","keepAlive","clientId", please read the | ||
* service configuration documentation for a detailed description. | ||
* | ||
* @param brokerConnectionConfig The configuration key-value map. | ||
* @param cfg The configuration key-value map. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is not a key-value map anymore |
||
* @return Returns the created broker connection or null if there is already a connection with the same name. | ||
* @throws ConfigurationException Most likely your provided name and url are invalid. | ||
* @throws MqttException | ||
*/ | ||
public @Nullable MqttBrokerConnection addBrokerConnection(Map<String, String> brokerConnectionConfig) | ||
throws ConfigurationException, MqttException { | ||
public @Nullable MqttBrokerConnection addBrokerConnection(Config cfg) throws ConfigurationException, MqttException { | ||
// Extract mandatory fields | ||
String brokerID = brokerConnectionConfig.get(NAME_PROPERTY); | ||
String brokerID = cfg.name; | ||
if (brokerID == null || brokerID.isEmpty()) { | ||
throw new ConfigurationException("MQTT Broker property 'name' is not provided"); | ||
} | ||
brokerID = brokerID.toLowerCase(); | ||
|
||
final String brokerURL = brokerConnectionConfig.get("url"); | ||
final String brokerURL = cfg.url; | ||
if (brokerURL == null || brokerURL.isEmpty()) { | ||
throw new ConfigurationException("MQTT Broker property 'url' is not provided"); | ||
} | ||
|
||
// Add the connection | ||
MqttBrokerConnection connection; | ||
synchronized (brokerConnections) { | ||
|
@@ -262,21 +332,19 @@ public boolean addBrokerConnection(MqttBrokerConnection connection) { | |
} | ||
|
||
// Extract further configurations | ||
connection.setCredentials(brokerConnectionConfig.get("user"), brokerConnectionConfig.get("pwd")); | ||
connection.setClientId(brokerConnectionConfig.get("clientId")); | ||
String property = brokerConnectionConfig.get("keepAlive"); | ||
if (!StringUtils.isBlank(property)) { | ||
connection.setKeepAliveInterval(Integer.valueOf(property)); | ||
connection.setCredentials(cfg.user, cfg.pwd); | ||
connection.setClientId(cfg.clientId); | ||
if (cfg.keepAlive != null) { | ||
connection.setKeepAliveInterval(cfg.keepAlive); | ||
} | ||
property = brokerConnectionConfig.get("qos"); | ||
if (!StringUtils.isBlank(property)) { | ||
connection.setQos(Integer.valueOf(property)); | ||
if (cfg.qos != null) { | ||
connection.setQos(cfg.qos); | ||
} | ||
property = brokerConnectionConfig.get("retain"); | ||
if (!StringUtils.isBlank(property)) { | ||
connection.setRetain(Boolean.valueOf(property)); | ||
if (cfg.retain != null) { | ||
connection.setRetain(cfg.retain); | ||
} | ||
MqttWillAndTestament will = MqttWillAndTestament.fromString(brokerConnectionConfig.get("lwt")); | ||
MqttWillAndTestament will = MqttWillAndTestament.fromString(cfg.lwt, cfg.lwtTopic, cfg.lwtMessage, cfg.lwtQos, | ||
cfg.lwtRetain); | ||
if (will != null) { | ||
logger.debug("Setting last will: {}", will); | ||
connection.setLastWill(will); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would be useful to mention the new properties here ( lwtTopic, lwtMessage lwtQos lwtRetain)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation has been changed to use a config object that properties are "fixed". As all fields are nullable and so optional the documentation needs to state which ones are required.
I don't know if it make sense to sync the additional properties all the time.
Perhaps we could remove the "Additional properties ..." part at all if this PR will be accepted (and before it is merged).