diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTests.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTests.java index 9688f6805cb..0b2e535f3ae 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTests.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTests.java @@ -59,12 +59,12 @@ public void extractBrokerConfigurationsTests() throws ConfigurationException, Mq Map properties = new Hashtable<>(); properties.put("bam.name", "brokername"); properties.put("bam.url", "tcp://123.123.123.123"); - Map> map = service.extractBrokerConfigurations(properties); + Map map = service.extractBrokerConfigurations(properties); assertEquals(map.size(), 1); - Map data = map.get("bam"); + MqttService.Config data = map.get("bam"); assertNotNull(data); - assertEquals("brokername", data.get("name")); - assertEquals("tcp://123.123.123.123", data.get("url")); + assertEquals("brokername", data.name); + assertEquals("tcp://123.123.123.123", data.url); } // Tests if updates to the textual configuration are processed correctly diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java index 7d62ced9dc0..0ee65f6baf5 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java @@ -12,6 +12,7 @@ */ package org.eclipse.smarthome.io.transport.mqtt; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -20,6 +21,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import javax.naming.ConfigurationException; @@ -51,6 +53,70 @@ "service.pid=org.eclipse.smarthome.mqtt" }) @NonNullByDefault public class MqttService { + public static class Config { + public final @Nullable String name; + public final @Nullable String url; + public final @Nullable String user; + public final @Nullable String pwd; + public final @Nullable String clientId; + public final @Nullable Integer keepAlive; + public final @Nullable Integer qos; + public final @Nullable Boolean retain; + public final @Nullable String lwt; + public final @Nullable String lwtTopic; + public final byte @Nullable [] lwtMessage; + public final @Nullable Integer lwtQos; + public final @Nullable Boolean lwtRetain; + + public Config(final Map cfg) { + name = cfg.get(NAME_PROPERTY); + url = cfg.get("url"); + user = cfg.get("user"); + pwd = cfg.get("pwd"); + clientId = cfg.get("clientId"); + keepAlive = asInt(cfg.get("keepAlive")); + qos = asInt(cfg.get("qos")); + retain = asBool(cfg.get("retain")); + + // Check for Last Will and Testament + lwt = cfg.get("lwt"); + + // Inspect explicit given LWT values + lwtTopic = cfg.get("lwtTopic"); + String tmp = cfg.get("lwtMessage"); + if (tmp != null) { + lwtMessage = tmp.getBytes(StandardCharsets.UTF_8); + } else { + lwtMessage = null; + } + Integer tmpInt = asInt(cfg.get("lwtQos")); + if (tmpInt != null && tmpInt >= 0 && tmpInt <= 2) { + lwtQos = tmpInt; + } else { + lwtQos = null; + } + lwtRetain = asBool(cfg.get("lwtRetain")); + } + + private static @Nullable Integer asInt(final @Nullable String value) { + if (value == null || value.isEmpty()) { + return null; + } + try { + return Integer.valueOf(value); + } catch (final NumberFormatException ex) { + return null; + } + } + + private static @Nullable Boolean asBool(final @Nullable String value) { + if (value == null || value.isEmpty()) { + return null; + } + return Boolean.valueOf(value); + } + } + private static final String NAME_PROPERTY = "name"; private final Logger logger = LoggerFactory.getLogger(MqttService.class); private final Map brokerConnections = new ConcurrentHashMap(); @@ -71,7 +137,7 @@ public class MqttService { * @param properties Service configuration * @return A 'list' of broker configurations as key-value maps. A configuration map at least contains a "name". */ - public Map> extractBrokerConfigurations(Map properties) { + public Map extractBrokerConfigurations(Map properties) { Map> configPerBroker = new HashMap>(); for (Entry entry : properties.entrySet()) { String key = entry.getKey(); @@ -106,7 +172,8 @@ public Map> extractBrokerConfigurations(Map entry.getKey(), entry -> new Config(entry.getValue()))); } /** @@ -133,9 +200,9 @@ public void modified(Map config) { return; } - Map> brokerConfigs = extractBrokerConfigurations(config); + Map brokerConfigs = extractBrokerConfigurations(config); - for (Map brokerConfig : brokerConfigs.values()) { + for (Config brokerConfig : brokerConfigs.values()) { try { final MqttBrokerConnection conn = addBrokerConnection(brokerConfig); if (conn == null) { @@ -228,28 +295,31 @@ public boolean addBrokerConnection(MqttBrokerConnection connection) { } /** - * Add a broker by a configuration key-value map. You need to provide at least a "name" and an "url". + * Add a broker by a configuration. + * + *

+ * You need to provide at least a "name" and an "url". * Additional properties are "user","pwd","qos","retain","lwt","keepAlive","clientId", please read the * service configuration documentation for a detailed description. * - * @param brokerConnectionConfig The configuration key-value map. + * @param cfg The configuration key-value map. * @return Returns the created broker connection or null if there is already a connection with the same name. * @throws ConfigurationException Most likely your provided name and url are invalid. * @throws MqttException */ - public @Nullable MqttBrokerConnection addBrokerConnection(Map brokerConnectionConfig) - throws ConfigurationException, MqttException { + public @Nullable MqttBrokerConnection addBrokerConnection(Config cfg) throws ConfigurationException, MqttException { // Extract mandatory fields - String brokerID = brokerConnectionConfig.get(NAME_PROPERTY); + String brokerID = cfg.name; if (brokerID == null || brokerID.isEmpty()) { throw new ConfigurationException("MQTT Broker property 'name' is not provided"); } brokerID = brokerID.toLowerCase(); - final String brokerURL = brokerConnectionConfig.get("url"); + final String brokerURL = cfg.url; if (brokerURL == null || brokerURL.isEmpty()) { throw new ConfigurationException("MQTT Broker property 'url' is not provided"); } + // Add the connection MqttBrokerConnection connection; synchronized (brokerConnections) { @@ -262,21 +332,19 @@ public boolean addBrokerConnection(MqttBrokerConnection connection) { } // Extract further configurations - connection.setCredentials(brokerConnectionConfig.get("user"), brokerConnectionConfig.get("pwd")); - connection.setClientId(brokerConnectionConfig.get("clientId")); - String property = brokerConnectionConfig.get("keepAlive"); - if (!StringUtils.isBlank(property)) { - connection.setKeepAliveInterval(Integer.valueOf(property)); + connection.setCredentials(cfg.user, cfg.pwd); + connection.setClientId(cfg.clientId); + if (cfg.keepAlive != null) { + connection.setKeepAliveInterval(cfg.keepAlive); } - property = brokerConnectionConfig.get("qos"); - if (!StringUtils.isBlank(property)) { - connection.setQos(Integer.valueOf(property)); + if (cfg.qos != null) { + connection.setQos(cfg.qos); } - property = brokerConnectionConfig.get("retain"); - if (!StringUtils.isBlank(property)) { - connection.setRetain(Boolean.valueOf(property)); + if (cfg.retain != null) { + connection.setRetain(cfg.retain); } - MqttWillAndTestament will = MqttWillAndTestament.fromString(brokerConnectionConfig.get("lwt")); + MqttWillAndTestament will = MqttWillAndTestament.fromString(cfg.lwt, cfg.lwtTopic, cfg.lwtMessage, cfg.lwtQos, + cfg.lwtRetain); if (will != null) { logger.debug("Setting last will: {}", will); connection.setLastWill(will); diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttWillAndTestament.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttWillAndTestament.java index 8f50103ac4e..383b5bcec66 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttWillAndTestament.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttWillAndTestament.java @@ -12,7 +12,11 @@ */ package org.eclipse.smarthome.io.transport.mqtt; +import java.nio.charset.StandardCharsets; + import org.apache.commons.lang.StringUtils; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; /** * Class encapsulating the last will and testament that is published after the client has gone offline. @@ -20,11 +24,15 @@ * @author Markus Mann * */ +@NonNullByDefault public class MqttWillAndTestament { - private String topic; - private byte[] payload; - private int qos = 0; - private boolean retain = false; + private static final int DFL_QOS = 0; + private static final boolean DFL_RETAIN = false; + + private final String topic; + private final byte @Nullable [] payload; + private final int qos; + private final boolean retain; /** * Create an instance of the last will using a string with the following format:
@@ -40,42 +48,65 @@ public class MqttWillAndTestament { * @param string the string to parse. If null, null is returned * @return the will instance, will be null only if parameter is null */ - public static MqttWillAndTestament fromString(String string) { - if (string == null) { - return null; - } - MqttWillAndTestament result = new MqttWillAndTestament(); - String[] components = string.split(":"); - for (int i = 0; i < Math.min(components.length, 4); i++) { - String value = StringUtils.trimToEmpty(components[i]); - switch (i) { - case 0: - result.topic = value; - break; - case 1: - result.payload = value.getBytes(); - break; - case 2: - if (!"".equals(value)) { - int qos = Integer.valueOf(value); - if (qos >= 0 && qos <= 2) { - result.qos = qos; + public static @Nullable MqttWillAndTestament fromString(@Nullable String string) { + return fromString(string, null, null, null, null); + } + + public static @Nullable MqttWillAndTestament fromString(@Nullable String string, @Nullable String topic, + byte @Nullable [] payload, @Nullable Integer qos, @Nullable Boolean retain) { + String tmpTopic = null; + byte[] tmpPayload = null; + int tmpQos = DFL_QOS; + boolean tmpRetain = DFL_RETAIN; + + // Parse string if given. + if (string != null) { + String[] components = string.split(":"); + for (int i = 0; i < Math.min(components.length, 4); i++) { + String value = StringUtils.trimToEmpty(components[i]); + switch (i) { + case 0: + tmpTopic = value; + break; + case 1: + tmpPayload = value.getBytes(StandardCharsets.UTF_8); + break; + case 2: + if (!"".equals(value)) { + int tmp = Integer.valueOf(value); + if (tmp >= 0 && tmp <= 2) { + tmpQos = tmp; + } } - } - break; - case 3: - result.retain = Boolean.valueOf(value); - break; + break; + case 3: + tmpRetain = Boolean.valueOf(value); + break; + } } } - return result.isValid() ? result : null; - } - /** - * Hide the constructor and force consumers to use the fromString() method or the - * constructor requiring all field parameters to be set. - */ - private MqttWillAndTestament() { + // Use explicit given values. + if (topic != null) { + tmpTopic = topic; + } + if (payload != null) { + tmpPayload = payload; + } + if (qos != null) { + tmpQos = qos; + } + if (retain != null) { + tmpRetain = retain; + } + + // Check if valid + if (tmpTopic == null || tmpTopic.isEmpty()) { + return null; + } + + // Create MQTT Last Will and Testament object + return new MqttWillAndTestament(tmpTopic, tmpPayload, tmpQos, tmpRetain); } /** @@ -86,7 +117,7 @@ private MqttWillAndTestament() { * @param qos Valid values are 0 (Deliver at most once),1 (Deliver at least once) or 2 * @param retain true if messages shall be retained */ - public MqttWillAndTestament(String topic, byte[] payload, int qos, boolean retain) { + public MqttWillAndTestament(String topic, byte @Nullable [] payload, int qos, boolean retain) { if (StringUtils.isBlank(topic)) { throw new IllegalArgumentException("Topic must be set"); } @@ -96,13 +127,6 @@ public MqttWillAndTestament(String topic, byte[] payload, int qos, boolean retai this.retain = retain; } - /** - * Return true if the last will and testament object is valid. - */ - private boolean isValid() { - return !StringUtils.isBlank(topic); - } - /** * @return the topic for the last will. */ @@ -113,7 +137,7 @@ public String getTopic() { /** * @return the payload of the last will. */ - public byte[] getPayload() { + public byte @Nullable [] getPayload() { return payload; }