Navigation Menu

Skip to content
This repository has been archived by the owner on May 7, 2020. It is now read-only.

Commit

Permalink
MQTT last will and testament (#5231)
Browse files Browse the repository at this point in the history
* MQTT transport: use a broker connection config class
* MQTT transport: improve "MQTT will and testament" class
* MQTT transport: add support to configure lwt parts

Fixes: #5197
Signed-off-by: Markus Rathgeb <maggu2810@gmail.com>
  • Loading branch information
maggu2810 authored and kaikreuzer committed Mar 14, 2018
1 parent 3d01f35 commit 421c1c0
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 71 deletions.
Expand Up @@ -59,12 +59,12 @@ public void extractBrokerConfigurationsTests() throws ConfigurationException, Mq
Map<String, Object> properties = new Hashtable<>();
properties.put("bam.name", "brokername");
properties.put("bam.url", "tcp://123.123.123.123");
Map<String, Map<String, String>> map = service.extractBrokerConfigurations(properties);
Map<String, MqttService.Config> map = service.extractBrokerConfigurations(properties);
assertEquals(map.size(), 1);
Map<String, String> data = map.get("bam");
MqttService.Config data = map.get("bam");
assertNotNull(data);
assertEquals("brokername", data.get("name"));
assertEquals("tcp://123.123.123.123", data.get("url"));
assertEquals("brokername", data.name);
assertEquals("tcp://123.123.123.123", data.url);
}

// Tests if updates to the textual configuration are processed correctly
Expand Down
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.smarthome.io.transport.mqtt;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -20,6 +21,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

import javax.naming.ConfigurationException;

Expand Down Expand Up @@ -51,6 +53,70 @@
"service.pid=org.eclipse.smarthome.mqtt" })
@NonNullByDefault
public class MqttService {
public static class Config {
public final @Nullable String name;
public final @Nullable String url;
public final @Nullable String user;
public final @Nullable String pwd;
public final @Nullable String clientId;
public final @Nullable Integer keepAlive;
public final @Nullable Integer qos;
public final @Nullable Boolean retain;
public final @Nullable String lwt;
public final @Nullable String lwtTopic;
public final byte @Nullable [] lwtMessage;
public final @Nullable Integer lwtQos;
public final @Nullable Boolean lwtRetain;

public Config(final Map<String, String> cfg) {
name = cfg.get(NAME_PROPERTY);
url = cfg.get("url");
user = cfg.get("user");
pwd = cfg.get("pwd");
clientId = cfg.get("clientId");
keepAlive = asInt(cfg.get("keepAlive"));
qos = asInt(cfg.get("qos"));
retain = asBool(cfg.get("retain"));

// Check for Last Will and Testament
lwt = cfg.get("lwt");

// Inspect explicit given LWT values
lwtTopic = cfg.get("lwtTopic");
String tmp = cfg.get("lwtMessage");
if (tmp != null) {
lwtMessage = tmp.getBytes(StandardCharsets.UTF_8);
} else {
lwtMessage = null;
}
Integer tmpInt = asInt(cfg.get("lwtQos"));
if (tmpInt != null && tmpInt >= 0 && tmpInt <= 2) {
lwtQos = tmpInt;
} else {
lwtQos = null;
}
lwtRetain = asBool(cfg.get("lwtRetain"));
}

private static @Nullable Integer asInt(final @Nullable String value) {
if (value == null || value.isEmpty()) {
return null;
}
try {
return Integer.valueOf(value);
} catch (final NumberFormatException ex) {
return null;
}
}

private static @Nullable Boolean asBool(final @Nullable String value) {
if (value == null || value.isEmpty()) {
return null;
}
return Boolean.valueOf(value);
}
}

private static final String NAME_PROPERTY = "name";
private final Logger logger = LoggerFactory.getLogger(MqttService.class);
private final Map<String, MqttBrokerConnection> brokerConnections = new ConcurrentHashMap<String, MqttBrokerConnection>();
Expand All @@ -71,7 +137,7 @@ public class MqttService {
* @param properties Service configuration
* @return A 'list' of broker configurations as key-value maps. A configuration map at least contains a "name".
*/
public Map<String, Map<String, String>> extractBrokerConfigurations(Map<String, Object> properties) {
public Map<String, Config> extractBrokerConfigurations(Map<String, Object> properties) {
Map<String, Map<String, String>> configPerBroker = new HashMap<String, Map<String, String>>();
for (Entry<String, Object> entry : properties.entrySet()) {
String key = entry.getKey();
Expand Down Expand Up @@ -106,7 +172,8 @@ public Map<String, Map<String, String>> extractBrokerConfigurations(Map<String,
brokerConfig.put(subkeys[1], value);
}

return configPerBroker;
return configPerBroker.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> new Config(entry.getValue())));
}

/**
Expand All @@ -133,9 +200,9 @@ public void modified(Map<String, Object> config) {
return;
}

Map<String, Map<String, String>> brokerConfigs = extractBrokerConfigurations(config);
Map<String, Config> brokerConfigs = extractBrokerConfigurations(config);

for (Map<String, String> brokerConfig : brokerConfigs.values()) {
for (Config brokerConfig : brokerConfigs.values()) {
try {
final MqttBrokerConnection conn = addBrokerConnection(brokerConfig);
if (conn == null) {
Expand Down Expand Up @@ -228,28 +295,31 @@ public boolean addBrokerConnection(MqttBrokerConnection connection) {
}

/**
* Add a broker by a configuration key-value map. You need to provide at least a "name" and an "url".
* Add a broker by a configuration.
*
* <p>
* You need to provide at least a "name" and an "url".
* Additional properties are "user","pwd","qos","retain","lwt","keepAlive","clientId", please read the
* service configuration documentation for a detailed description.
*
* @param brokerConnectionConfig The configuration key-value map.
* @param cfg The configuration key-value map.
* @return Returns the created broker connection or null if there is already a connection with the same name.
* @throws ConfigurationException Most likely your provided name and url are invalid.
* @throws MqttException
*/
public @Nullable MqttBrokerConnection addBrokerConnection(Map<String, String> brokerConnectionConfig)
throws ConfigurationException, MqttException {
public @Nullable MqttBrokerConnection addBrokerConnection(Config cfg) throws ConfigurationException, MqttException {
// Extract mandatory fields
String brokerID = brokerConnectionConfig.get(NAME_PROPERTY);
String brokerID = cfg.name;
if (brokerID == null || brokerID.isEmpty()) {
throw new ConfigurationException("MQTT Broker property 'name' is not provided");
}
brokerID = brokerID.toLowerCase();

final String brokerURL = brokerConnectionConfig.get("url");
final String brokerURL = cfg.url;
if (brokerURL == null || brokerURL.isEmpty()) {
throw new ConfigurationException("MQTT Broker property 'url' is not provided");
}

// Add the connection
MqttBrokerConnection connection;
synchronized (brokerConnections) {
Expand All @@ -262,21 +332,19 @@ public boolean addBrokerConnection(MqttBrokerConnection connection) {
}

// Extract further configurations
connection.setCredentials(brokerConnectionConfig.get("user"), brokerConnectionConfig.get("pwd"));
connection.setClientId(brokerConnectionConfig.get("clientId"));
String property = brokerConnectionConfig.get("keepAlive");
if (!StringUtils.isBlank(property)) {
connection.setKeepAliveInterval(Integer.valueOf(property));
connection.setCredentials(cfg.user, cfg.pwd);
connection.setClientId(cfg.clientId);
if (cfg.keepAlive != null) {
connection.setKeepAliveInterval(cfg.keepAlive);
}
property = brokerConnectionConfig.get("qos");
if (!StringUtils.isBlank(property)) {
connection.setQos(Integer.valueOf(property));
if (cfg.qos != null) {
connection.setQos(cfg.qos);
}
property = brokerConnectionConfig.get("retain");
if (!StringUtils.isBlank(property)) {
connection.setRetain(Boolean.valueOf(property));
if (cfg.retain != null) {
connection.setRetain(cfg.retain);
}
MqttWillAndTestament will = MqttWillAndTestament.fromString(brokerConnectionConfig.get("lwt"));
MqttWillAndTestament will = MqttWillAndTestament.fromString(cfg.lwt, cfg.lwtTopic, cfg.lwtMessage, cfg.lwtQos,
cfg.lwtRetain);
if (will != null) {
logger.debug("Setting last will: {}", will);
connection.setLastWill(will);
Expand Down
Expand Up @@ -12,19 +12,27 @@
*/
package org.eclipse.smarthome.io.transport.mqtt;

import java.nio.charset.StandardCharsets;

import org.apache.commons.lang.StringUtils;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;

/**
* Class encapsulating the last will and testament that is published after the client has gone offline.
*
* @author Markus Mann
*
*/
@NonNullByDefault
public class MqttWillAndTestament {
private String topic;
private byte[] payload;
private int qos = 0;
private boolean retain = false;
private static final int DFL_QOS = 0;
private static final boolean DFL_RETAIN = false;

private final String topic;
private final byte @Nullable [] payload;
private final int qos;
private final boolean retain;

/**
* Create an instance of the last will using a string with the following format:<br/>
Expand All @@ -40,42 +48,65 @@ public class MqttWillAndTestament {
* @param string the string to parse. If null, null is returned
* @return the will instance, will be null only if parameter is null
*/
public static MqttWillAndTestament fromString(String string) {
if (string == null) {
return null;
}
MqttWillAndTestament result = new MqttWillAndTestament();
String[] components = string.split(":");
for (int i = 0; i < Math.min(components.length, 4); i++) {
String value = StringUtils.trimToEmpty(components[i]);
switch (i) {
case 0:
result.topic = value;
break;
case 1:
result.payload = value.getBytes();
break;
case 2:
if (!"".equals(value)) {
int qos = Integer.valueOf(value);
if (qos >= 0 && qos <= 2) {
result.qos = qos;
public static @Nullable MqttWillAndTestament fromString(@Nullable String string) {
return fromString(string, null, null, null, null);
}

public static @Nullable MqttWillAndTestament fromString(@Nullable String string, @Nullable String topic,
byte @Nullable [] payload, @Nullable Integer qos, @Nullable Boolean retain) {
String tmpTopic = null;
byte[] tmpPayload = null;
int tmpQos = DFL_QOS;
boolean tmpRetain = DFL_RETAIN;

// Parse string if given.
if (string != null) {
String[] components = string.split(":");
for (int i = 0; i < Math.min(components.length, 4); i++) {
String value = StringUtils.trimToEmpty(components[i]);
switch (i) {
case 0:
tmpTopic = value;
break;
case 1:
tmpPayload = value.getBytes(StandardCharsets.UTF_8);
break;
case 2:
if (!"".equals(value)) {
int tmp = Integer.valueOf(value);
if (tmp >= 0 && tmp <= 2) {
tmpQos = tmp;
}
}
}
break;
case 3:
result.retain = Boolean.valueOf(value);
break;
break;
case 3:
tmpRetain = Boolean.valueOf(value);
break;
}
}
}
return result.isValid() ? result : null;
}

/**
* Hide the constructor and force consumers to use the fromString() method or the
* constructor requiring all field parameters to be set.
*/
private MqttWillAndTestament() {
// Use explicit given values.
if (topic != null) {
tmpTopic = topic;
}
if (payload != null) {
tmpPayload = payload;
}
if (qos != null) {
tmpQos = qos;
}
if (retain != null) {
tmpRetain = retain;
}

// Check if valid
if (tmpTopic == null || tmpTopic.isEmpty()) {
return null;
}

// Create MQTT Last Will and Testament object
return new MqttWillAndTestament(tmpTopic, tmpPayload, tmpQos, tmpRetain);
}

/**
Expand All @@ -86,7 +117,7 @@ private MqttWillAndTestament() {
* @param qos Valid values are 0 (Deliver at most once),1 (Deliver at least once) or 2</li>
* @param retain true if messages shall be retained
*/
public MqttWillAndTestament(String topic, byte[] payload, int qos, boolean retain) {
public MqttWillAndTestament(String topic, byte @Nullable [] payload, int qos, boolean retain) {
if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("Topic must be set");
}
Expand All @@ -96,13 +127,6 @@ public MqttWillAndTestament(String topic, byte[] payload, int qos, boolean retai
this.retain = retain;
}

/**
* Return true if the last will and testament object is valid.
*/
private boolean isValid() {
return !StringUtils.isBlank(topic);
}

/**
* @return the topic for the last will.
*/
Expand All @@ -113,7 +137,7 @@ public String getTopic() {
/**
* @return the payload of the last will.
*/
public byte[] getPayload() {
public byte @Nullable [] getPayload() {
return payload;
}

Expand Down

0 comments on commit 421c1c0

Please sign in to comment.