Skip to content
This repository has been archived by the owner on May 7, 2020. It is now read-only.

MQTT last will and testament #5231

Merged
merged 3 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public void extractBrokerConfigurationsTests() throws ConfigurationException, Mq
Map<String, Object> properties = new Hashtable<>();
properties.put("bam.name", "brokername");
properties.put("bam.url", "tcp://123.123.123.123");
Map<String, Map<String, String>> map = service.extractBrokerConfigurations(properties);
Map<String, MqttService.Config> map = service.extractBrokerConfigurations(properties);
assertEquals(map.size(), 1);
Map<String, String> data = map.get("bam");
MqttService.Config data = map.get("bam");
assertNotNull(data);
assertEquals("brokername", data.get("name"));
assertEquals("tcp://123.123.123.123", data.get("url"));
assertEquals("brokername", data.name);
assertEquals("tcp://123.123.123.123", data.url);
}

// Tests if updates to the textual configuration are processed correctly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.smarthome.io.transport.mqtt;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -20,6 +21,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

import javax.naming.ConfigurationException;

Expand Down Expand Up @@ -51,6 +53,70 @@
"service.pid=org.eclipse.smarthome.mqtt" })
@NonNullByDefault
public class MqttService {
public static class Config {
public final @Nullable String name;
public final @Nullable String url;
public final @Nullable String user;
public final @Nullable String pwd;
public final @Nullable String clientId;
public final @Nullable Integer keepAlive;
public final @Nullable Integer qos;
public final @Nullable Boolean retain;
public final @Nullable String lwt;
public final @Nullable String lwtTopic;
public final byte @Nullable [] lwtMessage;
public final @Nullable Integer lwtQos;
public final @Nullable Boolean lwtRetain;

public Config(final Map<String, String> cfg) {
name = cfg.get(NAME_PROPERTY);
url = cfg.get("url");
user = cfg.get("user");
pwd = cfg.get("pwd");
clientId = cfg.get("clientId");
keepAlive = asInt(cfg.get("keepAlive"));
qos = asInt(cfg.get("qos"));
retain = asBool(cfg.get("retain"));

// Check for Last Will and Testament
lwt = cfg.get("lwt");

// Inspect explicit given LWT values
lwtTopic = cfg.get("lwtTopic");
String tmp = cfg.get("lwtMessage");
if (tmp != null) {
lwtMessage = tmp.getBytes(StandardCharsets.UTF_8);
} else {
lwtMessage = null;
}
Integer tmpInt = asInt(cfg.get("lwtQos"));
if (tmpInt != null && tmpInt >= 0 && tmpInt <= 2) {
lwtQos = tmpInt;
} else {
lwtQos = null;
}
lwtRetain = asBool(cfg.get("lwtRetain"));
}

private static @Nullable Integer asInt(final @Nullable String value) {
if (value == null || value.isEmpty()) {
return null;
}
try {
return Integer.valueOf(value);
} catch (final NumberFormatException ex) {
return null;
}
}

private static @Nullable Boolean asBool(final @Nullable String value) {
if (value == null || value.isEmpty()) {
return null;
}
return Boolean.valueOf(value);
}
}

private static final String NAME_PROPERTY = "name";
private final Logger logger = LoggerFactory.getLogger(MqttService.class);
private final Map<String, MqttBrokerConnection> brokerConnections = new ConcurrentHashMap<String, MqttBrokerConnection>();
Expand All @@ -71,7 +137,7 @@ public class MqttService {
* @param properties Service configuration
* @return A 'list' of broker configurations as key-value maps. A configuration map at least contains a "name".
*/
public Map<String, Map<String, String>> extractBrokerConfigurations(Map<String, Object> properties) {
public Map<String, Config> extractBrokerConfigurations(Map<String, Object> properties) {
Map<String, Map<String, String>> configPerBroker = new HashMap<String, Map<String, String>>();
for (Entry<String, Object> entry : properties.entrySet()) {
String key = entry.getKey();
Expand Down Expand Up @@ -106,7 +172,8 @@ public Map<String, Map<String, String>> extractBrokerConfigurations(Map<String,
brokerConfig.put(subkeys[1], value);
}

return configPerBroker;
return configPerBroker.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> new Config(entry.getValue())));
}

/**
Expand All @@ -133,9 +200,9 @@ public void modified(Map<String, Object> config) {
return;
}

Map<String, Map<String, String>> brokerConfigs = extractBrokerConfigurations(config);
Map<String, Config> brokerConfigs = extractBrokerConfigurations(config);

for (Map<String, String> brokerConfig : brokerConfigs.values()) {
for (Config brokerConfig : brokerConfigs.values()) {
try {
final MqttBrokerConnection conn = addBrokerConnection(brokerConfig);
if (conn == null) {
Expand Down Expand Up @@ -228,28 +295,31 @@ public boolean addBrokerConnection(MqttBrokerConnection connection) {
}

/**
* Add a broker by a configuration key-value map. You need to provide at least a "name" and an "url".
* Add a broker by a configuration.
*
* <p>
* You need to provide at least a "name" and an "url".
* Additional properties are "user","pwd","qos","retain","lwt","keepAlive","clientId", please read the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be useful to mention the new properties here ( lwtTopic, lwtMessage lwtQos lwtRetain)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation has been changed to use a config object that properties are "fixed". As all fields are nullable and so optional the documentation needs to state which ones are required.
I don't know if it make sense to sync the additional properties all the time.
Perhaps we could remove the "Additional properties ..." part at all if this PR will be accepted (and before it is merged).

* service configuration documentation for a detailed description.
*
* @param brokerConnectionConfig The configuration key-value map.
* @param cfg The configuration key-value map.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not a key-value map anymore

* @return Returns the created broker connection or null if there is already a connection with the same name.
* @throws ConfigurationException Most likely your provided name and url are invalid.
* @throws MqttException
*/
public @Nullable MqttBrokerConnection addBrokerConnection(Map<String, String> brokerConnectionConfig)
throws ConfigurationException, MqttException {
public @Nullable MqttBrokerConnection addBrokerConnection(Config cfg) throws ConfigurationException, MqttException {
// Extract mandatory fields
String brokerID = brokerConnectionConfig.get(NAME_PROPERTY);
String brokerID = cfg.name;
if (brokerID == null || brokerID.isEmpty()) {
throw new ConfigurationException("MQTT Broker property 'name' is not provided");
}
brokerID = brokerID.toLowerCase();

final String brokerURL = brokerConnectionConfig.get("url");
final String brokerURL = cfg.url;
if (brokerURL == null || brokerURL.isEmpty()) {
throw new ConfigurationException("MQTT Broker property 'url' is not provided");
}

// Add the connection
MqttBrokerConnection connection;
synchronized (brokerConnections) {
Expand All @@ -262,21 +332,19 @@ public boolean addBrokerConnection(MqttBrokerConnection connection) {
}

// Extract further configurations
connection.setCredentials(brokerConnectionConfig.get("user"), brokerConnectionConfig.get("pwd"));
connection.setClientId(brokerConnectionConfig.get("clientId"));
String property = brokerConnectionConfig.get("keepAlive");
if (!StringUtils.isBlank(property)) {
connection.setKeepAliveInterval(Integer.valueOf(property));
connection.setCredentials(cfg.user, cfg.pwd);
connection.setClientId(cfg.clientId);
if (cfg.keepAlive != null) {
connection.setKeepAliveInterval(cfg.keepAlive);
}
property = brokerConnectionConfig.get("qos");
if (!StringUtils.isBlank(property)) {
connection.setQos(Integer.valueOf(property));
if (cfg.qos != null) {
connection.setQos(cfg.qos);
}
property = brokerConnectionConfig.get("retain");
if (!StringUtils.isBlank(property)) {
connection.setRetain(Boolean.valueOf(property));
if (cfg.retain != null) {
connection.setRetain(cfg.retain);
}
MqttWillAndTestament will = MqttWillAndTestament.fromString(brokerConnectionConfig.get("lwt"));
MqttWillAndTestament will = MqttWillAndTestament.fromString(cfg.lwt, cfg.lwtTopic, cfg.lwtMessage, cfg.lwtQos,
cfg.lwtRetain);
if (will != null) {
logger.debug("Setting last will: {}", will);
connection.setLastWill(will);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,27 @@
*/
package org.eclipse.smarthome.io.transport.mqtt;

import java.nio.charset.StandardCharsets;

import org.apache.commons.lang.StringUtils;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;

/**
* Class encapsulating the last will and testament that is published after the client has gone offline.
*
* @author Markus Mann
*
*/
@NonNullByDefault
public class MqttWillAndTestament {
private String topic;
private byte[] payload;
private int qos = 0;
private boolean retain = false;
private static final int DFL_QOS = 0;
private static final boolean DFL_RETAIN = false;

private final String topic;
private final byte @Nullable [] payload;
private final int qos;
private final boolean retain;

/**
* Create an instance of the last will using a string with the following format:<br/>
Expand All @@ -40,42 +48,65 @@ public class MqttWillAndTestament {
* @param string the string to parse. If null, null is returned
* @return the will instance, will be null only if parameter is null
*/
public static MqttWillAndTestament fromString(String string) {
if (string == null) {
return null;
}
MqttWillAndTestament result = new MqttWillAndTestament();
String[] components = string.split(":");
for (int i = 0; i < Math.min(components.length, 4); i++) {
String value = StringUtils.trimToEmpty(components[i]);
switch (i) {
case 0:
result.topic = value;
break;
case 1:
result.payload = value.getBytes();
break;
case 2:
if (!"".equals(value)) {
int qos = Integer.valueOf(value);
if (qos >= 0 && qos <= 2) {
result.qos = qos;
public static @Nullable MqttWillAndTestament fromString(@Nullable String string) {
return fromString(string, null, null, null, null);
}

public static @Nullable MqttWillAndTestament fromString(@Nullable String string, @Nullable String topic,
byte @Nullable [] payload, @Nullable Integer qos, @Nullable Boolean retain) {
String tmpTopic = null;
byte[] tmpPayload = null;
int tmpQos = DFL_QOS;
boolean tmpRetain = DFL_RETAIN;

// Parse string if given.
if (string != null) {
String[] components = string.split(":");
for (int i = 0; i < Math.min(components.length, 4); i++) {
String value = StringUtils.trimToEmpty(components[i]);
switch (i) {
case 0:
tmpTopic = value;
break;
case 1:
tmpPayload = value.getBytes(StandardCharsets.UTF_8);
break;
case 2:
if (!"".equals(value)) {
int tmp = Integer.valueOf(value);
if (tmp >= 0 && tmp <= 2) {
tmpQos = tmp;
}
}
}
break;
case 3:
result.retain = Boolean.valueOf(value);
break;
break;
case 3:
tmpRetain = Boolean.valueOf(value);
break;
}
}
}
return result.isValid() ? result : null;
}

/**
* Hide the constructor and force consumers to use the fromString() method or the
* constructor requiring all field parameters to be set.
*/
private MqttWillAndTestament() {
// Use explicit given values.
if (topic != null) {
tmpTopic = topic;
}
if (payload != null) {
tmpPayload = payload;
}
if (qos != null) {
tmpQos = qos;
}
if (retain != null) {
tmpRetain = retain;
}

// Check if valid
if (tmpTopic == null || tmpTopic.isEmpty()) {
return null;
}

// Create MQTT Last Will and Testament object
return new MqttWillAndTestament(tmpTopic, tmpPayload, tmpQos, tmpRetain);
}

/**
Expand All @@ -86,7 +117,7 @@ private MqttWillAndTestament() {
* @param qos Valid values are 0 (Deliver at most once),1 (Deliver at least once) or 2</li>
* @param retain true if messages shall be retained
*/
public MqttWillAndTestament(String topic, byte[] payload, int qos, boolean retain) {
public MqttWillAndTestament(String topic, byte @Nullable [] payload, int qos, boolean retain) {
if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("Topic must be set");
}
Expand All @@ -96,13 +127,6 @@ public MqttWillAndTestament(String topic, byte[] payload, int qos, boolean retai
this.retain = retain;
}

/**
* Return true if the last will and testament object is valid.
*/
private boolean isValid() {
return !StringUtils.isBlank(topic);
}

/**
* @return the topic for the last will.
*/
Expand All @@ -113,7 +137,7 @@ public String getTopic() {
/**
* @return the payload of the last will.
*/
public byte[] getPayload() {
public byte @Nullable [] getPayload() {
return payload;
}

Expand Down