From 370b6cfa5bc0b2d4cd83a74ea11699b389fd5ad9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Gr=C3=A4ff?= Date: Fri, 1 Sep 2017 19:05:02 +1000 Subject: [PATCH] MqttService: Remove all deprecated methods, rename close to stop and more. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Remove name from MqttBrokerConnection. This is only necessary for the MqttService. Consumers of MqttBrokerConnection should not be bothered with that detail. * Remove textConfigured flag. Again this is not interesting for anything outside MqttService. * Don't parse configuration parameters ourself, but use a configuration values class instead. * Use Nullable were it makes sense. Signed-off-by: David Gräff --- ...sts.java => MqttBrokerConnectionTest.java} | 85 ++--- ...ServiceTests.java => MqttServiceTest.java} | 52 +-- .../META-INF/MANIFEST.MF | 1 + .../transport/mqtt/MqttBrokerConnection.java | 187 +++++------ .../transport/mqtt/MqttConnectionState.java | 1 + .../transport/mqtt/MqttMessageConsumer.java | 37 --- .../transport/mqtt/MqttMessageProducer.java | 29 -- .../io/transport/mqtt/MqttSenderChannel.java | 32 -- .../io/transport/mqtt/MqttService.java | 298 ++++-------------- .../internal/ConnectionConfiguration.java | 27 ++ .../mqtt/internal/MqttSenderChannelImpl.java | 43 --- .../reconnect/PeriodicReconnectStrategy.java | 2 +- 12 files changed, 240 insertions(+), 554 deletions(-) rename bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/{MqttBrokerConnectionTests.java => MqttBrokerConnectionTest.java} (70%) rename bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/{MqttServiceTests.java => MqttServiceTest.java} (65%) delete mode 100644 bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageConsumer.java delete mode 100644 bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageProducer.java delete mode 100644 bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttSenderChannel.java create mode 100644 bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ConnectionConfiguration.java delete mode 100644 bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttSenderChannelImpl.java diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTest.java similarity index 70% rename from bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java rename to bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTest.java index e83a785148a..f9fb994ec45 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTests.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnectionTest.java @@ -27,31 +27,10 @@ * * @author David Graeff - Initial contribution */ -public class MqttBrokerConnectionTests { - @Test - public void testConstructor() throws ConfigurationException { - // Test tcp and ssl URLs - MqttBrokerConnection a = new MqttBrokerConnection("name", "tcp://123.123.123.123", false); - MqttBrokerConnection b = new MqttBrokerConnection("name", "ssl://123.123.123.123", true); - assertFalse(a.isTextualConfiguredBroker()); - assertTrue(b.isTextualConfiguredBroker()); - } - - @Test(expected = ConfigurationException.class) - public void testConstructorInvalidProtocol() throws ConfigurationException { - new MqttBrokerConnection("name", "unsupported://123.123.123.123", false); - } - - @Test(expected = ConfigurationException.class) - public void testConstructorInvalidName() throws ConfigurationException, MqttException { - new MqttBrokerConnection(" ", "tcp://123.123.123.123", false); - } - +public class MqttBrokerConnectionTest { @Test public void messageConsumerTests() throws ConfigurationException, MqttException { - final String url = "tcp://123.123.123.123"; - final String name = "TestName12@!"; - MqttBrokerConnection a = new MqttBrokerConnection(name, url, false); + MqttBrokerConnection a = new MqttBrokerConnection("123.123.123.123", 0, false); // Expect no consumers assertFalse(a.hasConsumers()); @@ -69,9 +48,7 @@ public void messageConsumerTests() throws ConfigurationException, MqttException @Test public void reconnectPolicyDefaultTest() throws ConfigurationException, MqttException, InterruptedException { - final String url = "tcp://123.123.123.123"; - final String name = "TestName12@!"; - MqttBrokerConnection a = new MqttBrokerConnection(name, url, false); + MqttBrokerConnection a = new MqttBrokerConnection("123.123.123.123", 0, false); // Check if the default policy is set and that the broker within the policy is set. assertTrue(a.getReconnectStrategy() instanceof PeriodicReconnectStrategy); @@ -81,9 +58,7 @@ public void reconnectPolicyDefaultTest() throws ConfigurationException, MqttExce @Test public void reconnectPolicyTests() throws ConfigurationException, MqttException, InterruptedException { - final String url = "tcp://123.123.123.123"; - final String name = "TestName12@!"; - MqttBrokerConnection a = spy(new MqttBrokerConnection(name, url, false)); + MqttBrokerConnection a = spy(new MqttBrokerConnection("123.123.123.123", 0, false)); // Check setter a.setReconnectStrategy(new PeriodicReconnectStrategy()); @@ -91,16 +66,15 @@ public void reconnectPolicyTests() throws ConfigurationException, MqttException, // Prepare a Mock to test if lostConnect is called and // if the PeriodicReconnectPolicy indeed calls start() - PeriodicReconnectStrategy mockPolicy = spy(new PeriodicReconnectStrategy()); - doReturn(a).when(mockPolicy).getBrokerConnection(); - doReturn(0).when(mockPolicy).getFirstReconnectAfter(); - doReturn(10000).when(mockPolicy).getReconnectFrequency(); + PeriodicReconnectStrategy mockPolicy = spy(new PeriodicReconnectStrategy(10000, 0)); doNothing().when(a).start(); + mockPolicy.start(); + a.isConnecting = true; // Fake a disconnect a.setReconnectStrategy(mockPolicy); IMqttActionListener l = a.createConnectionListener(); - doReturn(false).when(a).isConnected(); + doReturn(MqttConnectionState.DISCONNECTED).when(a).connectionState(); IMqttToken token = mock(IMqttToken.class); when(token.getException()).thenReturn(new org.eclipse.paho.client.mqttv3.MqttException(1)); l.onFailure(token, null); @@ -118,9 +92,7 @@ public void reconnectPolicyTests() throws ConfigurationException, MqttException, @Test public void connectionObserverTests() throws ConfigurationException, MqttException { - final String url = "tcp://123.123.123.123"; - final String name = "TestName12@!"; - MqttBrokerConnection a = spy(new MqttBrokerConnection(name, url, false)); + MqttBrokerConnection a = spy(new MqttBrokerConnection("123.123.123.123", 0, false)); // Add an observer assertFalse(a.hasConnectionObservers()); @@ -133,7 +105,7 @@ public void connectionObserverTests() throws ConfigurationException, MqttExcepti // Cause a success callback IMqttActionListener l = a.createConnectionListener(); - doReturn(true).when(a).isConnected(); + doReturn(MqttConnectionState.CONNECTED).when(a).connectionState(); l.onSuccess(null); verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.CONNECTED), anyObject()); @@ -143,7 +115,7 @@ public void connectionObserverTests() throws ConfigurationException, MqttExcepti 1); when(token.getException()).thenReturn(testException); - doReturn(false).when(a).isConnected(); + doReturn(MqttConnectionState.DISCONNECTED).when(a).connectionState(); l.onFailure(token, null); verify(connectionObserver, times(1)).connectionStateChanged(eq(MqttConnectionState.DISCONNECTED), eq(testException)); @@ -155,9 +127,7 @@ public void connectionObserverTests() throws ConfigurationException, MqttExcepti @Test public void lastWillAndTestamentTests() throws ConfigurationException { - final String url = "tcp://123.123.123.123"; - final String name = "TestName12@!"; - MqttBrokerConnection a = new MqttBrokerConnection(name, url, false); + MqttBrokerConnection a = new MqttBrokerConnection("123.123.123.123", 0, false); assertNull(a.getLastWill()); assertNull(MqttWillAndTestament.fromString("")); @@ -174,19 +144,28 @@ public void lastWillAndTestamentConstructorTests() { new MqttWillAndTestament("", new byte[0], 0, false); } + @Test(expected = IllegalArgumentException.class) + public void tooLongClientID() throws ConfigurationException { + MqttBrokerConnection a = new MqttBrokerConnection("123.123.123.123", 0, false); + // client ids longer than 23 characters should throw + a.setClientId("clientidclientidclientidclientid"); + } + + @Test(expected = IllegalArgumentException.class) + public void qosInvalid() throws ConfigurationException { + MqttBrokerConnection a = new MqttBrokerConnection("123.123.123.123", 0, false); + a.setQos(10); + } + @Test public void setterGetterTests() throws ConfigurationException { - final String url = "tcp://123.123.123.123"; - final String name = "TestName12@!"; - MqttBrokerConnection a = new MqttBrokerConnection(name, url, false); - assertEquals("URL getter", a.getUrl(), url); - assertEquals("Name getter", a.getName(), name); + MqttBrokerConnection a = new MqttBrokerConnection("123.123.123.123", 0, false); + assertEquals("URL getter", a.getHost(), "123.123.123.123"); + assertEquals("Name getter", a.getPort(), 0); + assertEquals("Secure getter", a.isSecure(), false); a.setClientId("clientid"); assertEquals("ClientID getter/setter", "clientid", a.getClientId()); - // client ids longer than 23 characters should be ignored - a.setClientId("clientidclientidclientidclientid"); - assertEquals("ClientID too long check", "clientid", a.getClientId()); a.setCredentials("user@!", "password123@^"); assertEquals("User getter/setter", "user@!", a.getUser()); @@ -200,10 +179,6 @@ public void setterGetterTests() throws ConfigurationException { a.setRetain(true); assertTrue(a.isRetain()); - assertEquals(MqttBrokerConnection.DEFAULT_QOS, a.getQos()); - a.setQos(10); - assertEquals(MqttBrokerConnection.DEFAULT_QOS, a.getQos()); - a.setQos(-10); assertEquals(MqttBrokerConnection.DEFAULT_QOS, a.getQos()); a.setQos(2); assertEquals(2, a.getQos()); @@ -214,6 +189,6 @@ public void setterGetterTests() throws ConfigurationException { assertNotNull(a.getSSLContextProvider()); assertNotNull(a.getReconnectStrategy()); - assertFalse(a.isConnected()); + assertThat(a.connectionState(), is(MqttConnectionState.DISCONNECTED)); } } diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTests.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTest.java similarity index 65% rename from bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTests.java rename to bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTest.java index a985af9a482..d4adc3ceeec 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTests.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt.test/src/test/java/org/eclipse/smarthome/io/transport/mqtt/MqttServiceTest.java @@ -11,12 +11,15 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.math.BigDecimal; import java.util.Collections; import java.util.Hashtable; import java.util.Map; import javax.naming.ConfigurationException; +import org.eclipse.smarthome.config.core.Configuration; +import org.eclipse.smarthome.io.transport.mqtt.internal.ConnectionConfiguration; import org.junit.Test; /** @@ -24,7 +27,7 @@ * * @author David Graeff - Initial contribution */ -public class MqttServiceTests { +public class MqttServiceTest { // Tests addBrokersListener/removeBrokersListener @Test public void brokerConnectionListenerTests() throws ConfigurationException { @@ -35,11 +38,11 @@ public void brokerConnectionListenerTests() throws ConfigurationException { service.addBrokersListener(observer); assertTrue(service.hasBrokerObservers()); - MqttBrokerConnection connection = new MqttBrokerConnection("name", "tcp://123.123.123.123", false); - assertTrue(service.addBrokerConnection(connection)); + MqttBrokerConnection connection = new MqttBrokerConnection("123.123.123.123", 0, false); + assertTrue(service.addBrokerConnection("name", connection)); verify(observer).brokerAdded(connection); - service.removeBrokerConnection(connection); + service.removeBrokerConnection("name"); verify(observer).brokerRemoved(connection); service.removeBrokersListener(observer); @@ -52,39 +55,40 @@ public void extractBrokerConfigurationsTests() throws ConfigurationException, Mq MqttService service = new MqttService(); Map properties = new Hashtable<>(); - properties.put("bam.name", "brokername"); - properties.put("bam.url", "tcp://123.123.123.123"); - Map> map = service.extractBrokerConfigurations(properties); + properties.put("bam.host", "123.123.123.123"); + properties.put("bam.port", BigDecimal.valueOf(1234)); + properties.put("bam.secure", false); + Map> map = service.extractBrokerConfigurations(properties); assertEquals(map.size(), 1); - Map data = map.get("bam"); - assertNotNull(data); - assertEquals("brokername", data.get("name")); - assertEquals("tcp://123.123.123.123", data.get("url")); + Map data = map.get("bam"); + @SuppressWarnings("null") + ConnectionConfiguration c = new Configuration(data).as(ConnectionConfiguration.class); + assertEquals("123.123.123.123", c.host); + assertEquals(false, c.secure); + assertEquals(BigDecimal.valueOf(1234), c.port); } - // Tests if updates to the textual configuration are processed correctly + // Tests if updates to the system broker connections via ConfigAdmin are processed correctly @Test - public void textualConfigurationTests() throws ConfigurationException, MqttException { + public void updateViaConfigAdminTests() throws ConfigurationException, MqttException { MqttService service = new MqttService(); Map properties = new Hashtable<>(); - properties.put("bam.name", "brokername"); - properties.put("bam.url", "tcp://123.123.123.123"); + properties.put("bam.host", "123.123.123.123"); // Test activate service.activate(properties); assertThat(service.getAllBrokerConnections(), hasSize(1)); - assertNotNull(service.getBrokerConnection("brokername")); + assertNotNull(service.getBrokerConnection("bam")); Map properties2 = new Hashtable<>(); - properties2.put("bam2.name", "brokername2"); - properties2.put("bam2.url", "tcp://123.123.123.123"); + properties2.put("bam2.url", "123.123.123.123"); // Test configuration change service.modified(properties2); assertThat(service.getAllBrokerConnections(), hasSize(1)); - assertNull(service.getBrokerConnection("brokername")); - assertNotNull(service.getBrokerConnection("brokername2")); + assertNull(service.getBrokerConnection("bam")); + assertNotNull(service.getBrokerConnection("bam2")); // Test if old broker connections are freed correctly service.modified(Collections.emptyMap()); @@ -96,22 +100,22 @@ public void brokerConnectionAddRemoveEnumerateTests() { MqttService service = new MqttService(); MqttBrokerConnection connection; try { - connection = new MqttBrokerConnection("name", "tcp://123.123.123.123", false); + connection = new MqttBrokerConnection("123.123.123.123", 0, false); } catch (ConfigurationException c) { fail("Couldn't create a MqttBrokerConnection object"); return; } // Add assertThat(service.getAllBrokerConnections(), hasSize(0)); - assertTrue(service.addBrokerConnection(connection)); - assertFalse(service.addBrokerConnection(connection)); + assertTrue(service.addBrokerConnection("name", connection)); + assertFalse(service.addBrokerConnection("name", connection)); // Get/Enumerate assertNotNull(service.getBrokerConnection("name")); assertThat(service.getAllBrokerConnections(), hasSize(1)); // Remove - service.removeBrokerConnection(connection); + service.removeBrokerConnection("name"); assertThat(service.getAllBrokerConnections(), hasSize(0)); } } diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/META-INF/MANIFEST.MF b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/META-INF/MANIFEST.MF index 20316fd1b76..24f304ce5bc 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/META-INF/MANIFEST.MF +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/META-INF/MANIFEST.MF @@ -17,6 +17,7 @@ Import-Package: javax.naming, org.eclipse.paho.client.mqttv3.logging, org.eclipse.paho.client.mqttv3.persist, org.eclipse.paho.client.mqttv3.util, + org.eclipse.smarthome.config.core, org.eclipse.smarthome.core.events, org.eclipse.smarthome.io.transport.mqtt, org.osgi.service.component, diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java index ca7668b088f..b870c59d8ef 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttBrokerConnection.java @@ -26,7 +26,6 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.eclipse.smarthome.io.transport.mqtt.internal.MqttSenderChannelImpl; import org.eclipse.smarthome.io.transport.mqtt.reconnect.AbstractReconnectStrategy; import org.eclipse.smarthome.io.transport.mqtt.reconnect.PeriodicReconnectStrategy; import org.eclipse.smarthome.io.transport.mqtt.sslcontext.AcceptAllCertificatesSSLContext; @@ -44,16 +43,15 @@ * @author Davy Vanherbergen * @author Markus Rathgeb - added connection state callback */ -@SuppressWarnings("deprecation") public class MqttBrokerConnection { private final Logger logger = LoggerFactory.getLogger(MqttBrokerConnection.class); public static final int DEFAULT_KEEPALIVE_INTERVAL = 60; public static final int DEFAULT_QOS = 0; /// Configuration variables - private final boolean textualConfiguredBroker; - private final String name; - private final String url; + private String host; + private int port; + private boolean secure; private String user; private String password; private int qos = DEFAULT_QOS; @@ -64,15 +62,11 @@ public class MqttBrokerConnection { private int keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL; /// Runtime variables - private String clientId; - private MqttAsyncClient client; - private boolean isConnecting = false; - - private final List connectionObservers = new CopyOnWriteArrayList<>(); - private final Map> consumers = new HashMap<>(); - // This should be removed by 2018 and before ESH 1.0 - @Deprecated - private final List producers = new CopyOnWriteArrayList(); + protected String clientId; + protected MqttAsyncClient client; + protected boolean isConnecting = false; + protected final List connectionObservers = new CopyOnWriteArrayList<>(); + protected final Map> consumers = new HashMap<>(); /** * A private object to implement the MqttCallback interface. @@ -84,11 +78,10 @@ private class ClientCallbacks implements MqttCallback { public synchronized void connectionLost(Throwable t) { if (t instanceof MqttException) { MqttException e = (MqttException) t; - logger.info("MQTT connection to '{}' was lost: {} : ReasonCode {} : Cause : {}", getName(), - e.getMessage(), e.getReasonCode(), - (e.getCause() == null ? "Unknown" : e.getCause().getMessage())); + logger.info("MQTT connection to '{}' was lost: {} : ReasonCode {} : Cause : {}", host, e.getMessage(), + e.getReasonCode(), (e.getCause() == null ? "Unknown" : e.getCause().getMessage())); } else { - logger.info("MQTT connection to '{}' was lost: {}", getName(), t.getMessage()); + logger.info("MQTT connection to '{}' was lost: {}", host, t.getMessage()); } for (final MqttConnectionObserver connectionObserver : connectionObservers) { @@ -128,24 +121,13 @@ public void messageArrived(String topic, MqttMessage message) { * Create a new connection with the given name. * * @param name for the connection. - * @param url url string for the MQTT broker. Valid URL's are in the format: tcp://localhost:1883 or - * ssl://localhost:8883 + * @param host A host name or address + * @param port A port or 0 to select the default port for a secure or insecure connection + * @param secure A secure connection * @throws ConfigurationException */ - public MqttBrokerConnection(@NonNull String name, @NonNull String url, boolean textualConfiguredBroker) - throws ConfigurationException { - this.textualConfiguredBroker = textualConfiguredBroker; - - if (name.isEmpty()) { - throw new ConfigurationException("No name for the broker set!"); - } - if (url.isEmpty() || (!url.startsWith("tcp://") && !url.startsWith("ssl://"))) { - throw new ConfigurationException( - "No valid url for the broker set! Must be tcp://localhost:1234 or ssl://localhost:1234. Port is optional."); - } - - this.name = name; - this.url = url; + public MqttBrokerConnection(@NonNull String host, int port, boolean secure) throws ConfigurationException { + setHostPort(host, port, secure); setReconnectStrategy(new PeriodicReconnectStrategy()); } @@ -165,31 +147,43 @@ public AbstractReconnectStrategy getReconnectStrategy() { } /** - * @return name for the connection as defined in smarthome.cfg. + * Get the MQTT broker host */ - public String getName() { - return name; + public String getHost() { + return host; } /** - * Get the url for the MQTT broker. Valid URL's are in the format: - * tcp://localhost:1883 or ssl://localhost:8883 - * - * @return url for the MQTT broker. + * Get the MQTT broker port + */ + public int getPort() { + return port; + } + + /** + * Return true if it is a secure connection to the broker */ - public String getUrl() { - return url; + public boolean isSecure() { + return secure; } /** - * Return true if it is a textual configured broker (textual=true in the constructor). + * Set new connection details. + * The connection needs to be restarted for the new settings to take effect. + * + * @param host A host name or address + * @param port A port or 0 to select the default port for a secure or insecure connection + * @param secure A secure connection */ - public boolean isTextualConfiguredBroker() { - return textualConfiguredBroker; + public void setHostPort(@NonNull String host, int port, boolean secure) { + this.host = host; + this.port = port; + this.secure = secure; } /** * Set the optional user name and optional password to use when connecting to the MQTT broker. + * The connection needs to be restarted for the new settings to take effect. * * @param user Name to use for connection. * @param password The password @@ -221,13 +215,17 @@ public int getQos() { } /** - * Set quality of service. Valid values are 0,1,2 + * Set quality of service. Valid values are 0, 1, 2 and mean + * "at most once", "at least once" and "exactly once" respectively. + * The connection needs to be restarted for the new settings to take effect. * * @param qos level. */ public void setQos(int qos) { if (qos >= 0 && qos <= 2) { this.qos = qos; + } else { + throw new IllegalArgumentException("The quality of service parameter must be >=0 and <=2."); } } @@ -255,7 +253,8 @@ public MqttWillAndTestament getLastWill() { } /** - * Set the last will object + * Set the last will object. + * The connection needs to be restarted for the new settings to take effect. * * @param lastWill The last will object or null. */ @@ -266,13 +265,14 @@ public void setLastWill(MqttWillAndTestament lastWill) { /** * Set client id to use when connecting to the broker. * If none is specified, a default is generated. The client id cannot - * be longer than 23 characters. Longer strings will be ignored. + * be longer than 23 characters. Longer strings will throw an exception. + * The connection needs to be restarted for the new settings to take effect. * * @param value clientId to use. Can be null. */ public void setClientId(String value) { if (value != null && value.length() > 23) { - return; + throw new IllegalArgumentException("Client ID cannot be longer than 23 characters"); } this.clientId = value; } @@ -287,10 +287,14 @@ public String getClientId() { } /** - * Returns true if a connection to the Mqtt broker is established + * Returns the connection state */ - public boolean isConnected() { - return client != null && client.isConnected(); + public MqttConnectionState connectionState() { + if (isConnecting) { + return MqttConnectionState.CONNECTING; + } + return (client != null && client.isConnected()) ? MqttConnectionState.CONNECTED + : MqttConnectionState.DISCONNECTED; } /** @@ -311,7 +315,7 @@ public void setKeepAliveInterval(int keepAliveInterval) { * Return the keep alive internal in seconds */ public int getKeepAliveInterval() { - return this.keepAliveInterval; + return keepAliveInterval; } /** @@ -358,7 +362,7 @@ public boolean addConsumer(MqttMessageSubscriber subscriber) throws MqttExceptio subscriberList.add(subscriber); consumers.put(topic, subscriberList); } - if (isConnected()) { + if (connectionState() == MqttConnectionState.CONNECTED) { try { client.subscribe(topic, qos); } catch (org.eclipse.paho.client.mqttv3.MqttException e) { @@ -374,11 +378,10 @@ public boolean addConsumer(MqttMessageSubscriber subscriber) throws MqttExceptio * @param subscriber to remove. */ public void removeConsumer(MqttMessageSubscriber subscriber) { - logger.trace("Unsubscribing message consumer for topic '{}' from broker '{}'", subscriber.getTopic(), - getName()); + logger.trace("Unsubscribing message consumer for topic '{}' from broker '{}'", subscriber.getTopic(), host); try { - if (isConnected()) { + if (connectionState() == MqttConnectionState.CONNECTED) { client.unsubscribe(subscriber.getTopic()); } } catch (org.eclipse.paho.client.mqttv3.MqttException e) { @@ -421,33 +424,6 @@ public boolean hasConnectionObservers() { return !connectionObservers.isEmpty(); } - /** - * Add a new message producer to this connection. - * This is deprecated. Use the publish() method instead. - * - * @deprecated - * @param publisher to add. - */ - @Deprecated - public synchronized void addProducer(MqttMessageProducer publisher) { - producers.add(publisher); - if (isConnected()) { - publisher.setSenderChannel(new MqttSenderChannelImpl(this)); - } - } - - /** - * Remove a previously registered producer from this connection. - * - * @deprecated - * @param publisher to remove. - */ - @Deprecated - public synchronized void removeProducer(MqttMessageProducer publisher) { - publisher.setSenderChannel(null); - producers.remove(publisher); - } - /** * Create a MqttConnectOptions object using the fields of this MqttBrokerConnection instance. * Package local, for testing. @@ -461,7 +437,7 @@ MqttConnectOptions createMqttOptions() throws ConfigurationException { if (!StringUtils.isBlank(password)) { options.setPassword(password.toCharArray()); } - if (getUrl().toLowerCase().startsWith("ssl")) { + if (secure) { options.setSocketFactory(sslContextProvider.getContext().getSocketFactory()); } @@ -498,14 +474,11 @@ public void onSuccess(IMqttToken asyncActionToken) { } } - // start all producers - for (MqttMessageProducer p : producers) { - p.setSenderChannel(new MqttSenderChannelImpl(MqttBrokerConnection.this)); - } - for (final MqttConnectionObserver connectionObserver : connectionObservers) { connectionObserver.connectionStateChanged( - isConnected() ? MqttConnectionState.CONNECTED : MqttConnectionState.DISCONNECTED, null); + connectionState() == MqttConnectionState.CONNECTED ? MqttConnectionState.CONNECTED + : MqttConnectionState.DISCONNECTED, + null); } } @@ -513,7 +486,8 @@ public void onSuccess(IMqttToken asyncActionToken) { public void onFailure(IMqttToken asyncActionToken, Throwable exception) { for (final MqttConnectionObserver connectionObserver : connectionObservers) { connectionObserver.connectionStateChanged( - isConnected() ? MqttConnectionState.CONNECTED : MqttConnectionState.DISCONNECTED, + connectionState() == MqttConnectionState.CONNECTED ? MqttConnectionState.CONNECTED + : MqttConnectionState.DISCONNECTED, asyncActionToken.getException()); } @@ -538,13 +512,13 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) { * conn.addConnectionObserver((isConnected, error) -> o.notify() ); * conn.start(); * o.wait(timeout_in_ms); - * boolean success = conn.isConnected(); + * boolean success = conn.connectionState()==MqttConnectionState.CONNECTED; * * @throws MqttException If a communication error occurred, this exception is thrown. * @throws ConfigurationException If no url is given or parameters are invalid, this exception is thrown. */ public synchronized void start() throws MqttException, ConfigurationException { - if (isConnecting || isConnected()) { + if (isConnecting || connectionState() == MqttConnectionState.CONNECTED) { return; } @@ -558,19 +532,26 @@ public synchronized void start() throws MqttException, ConfigurationException { } // Storage - String tmpDir = System.getProperty("java.io.tmpdir") + "/" + getName(); + String tmpDir = System.getProperty("java.io.tmpdir") + "/" + host; MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir); + StringBuilder serverURI = new StringBuilder(); + serverURI.append((secure ? "ssl://" : "tcp://")); + serverURI.append(host); + if (port != 0) { + serverURI.append(port); + } + // Create client try { - client = new MqttAsyncClient(getUrl(), clientId, dataStore); + client = new MqttAsyncClient(serverURI.toString(), clientId, dataStore); } catch (org.eclipse.paho.client.mqttv3.MqttException e) { throw new MqttException(e); } client.setCallback(clientCallbacks); - logger.info("Starting MQTT broker connection '{}' to '{}' with clientid {} and file store '{}'", getName(), - getUrl(), getClientId(), tmpDir); + logger.info("Starting MQTT broker connection to '{}' with clientid {} and file store '{}'", host, getClientId(), + tmpDir); // Perform the connection attempt isConnecting = true; @@ -583,12 +564,12 @@ public synchronized void start() throws MqttException, ConfigurationException { } /** - * Close the MQTT connection. + * Stop the MQTT connection. * * You can re-establish a connection calling {@link #start()} again. */ - public synchronized void close() { - logger.trace("Closing the MQTT broker connection '{}'", getName()); + public synchronized void stop() { + logger.trace("Closing the MQTT broker connection '{}'", host); // Abort a connection attempt isConnecting = false; @@ -600,7 +581,7 @@ public synchronized void close() { // Close connection try { - if (isConnected()) { + if (connectionState() == MqttConnectionState.CONNECTED) { client.disconnect(); client = null; } diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttConnectionState.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttConnectionState.java index 0683819c3d4..1a39ea356be 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttConnectionState.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttConnectionState.java @@ -14,5 +14,6 @@ */ public enum MqttConnectionState { DISCONNECTED, + CONNECTING, CONNECTED } diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageConsumer.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageConsumer.java deleted file mode 100644 index 273f13da44e..00000000000 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageConsumer.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright (c) 2014-2017 by the respective copyright holders. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - */ -package org.eclipse.smarthome.io.transport.mqtt; - -import org.eclipse.smarthome.core.events.EventPublisher; - -/** - * All message consumers which want to register as a message consumer to a MqttBrokerConnection should implement this - * interface. - * - * This is a deprecated interface. Please use MqttMessageSubscriber instead. - * - * @author Davy Vanherbergen - * @deprecated - */ -@Deprecated -public interface MqttMessageConsumer extends MqttMessageSubscriber { - /** - * Set Topic to subscribe to. May contain + or # wildcards - * - * @param topic to subscribe to. - */ - public void setTopic(String topic); - - /** - * Set the event publisher to use when broadcasting received messages onto the smarthome event bus. - * - * @param eventPublisher - */ - public void setEventPublisher(EventPublisher eventPublisher); - -} diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageProducer.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageProducer.java deleted file mode 100644 index 49567b5c31c..00000000000 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttMessageProducer.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (c) 2014-2017 by the respective copyright holders. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - */ -package org.eclipse.smarthome.io.transport.mqtt; - -/** - * All message producers which want to register as a message producer to a MqttBrokerConnection should implement this - * interface. - * - * This interface is deprecated. Use the {@see MqttBrokerConnection.publish()} method instead. - * - * @deprecated - * @author Davy Vanherbergen - */ -@Deprecated -public interface MqttMessageProducer { - - /** - * Set the sender channel which the message producer should use to publish any message. - * - * @param channel Sender Channel which will be set by the MqttBrokerConnection. - */ - public void setSenderChannel(MqttSenderChannel channel); - -} diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttSenderChannel.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttSenderChannel.java deleted file mode 100644 index f294d652b71..00000000000 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttSenderChannel.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Copyright (c) 2014-2017 by the respective copyright holders. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - */ -package org.eclipse.smarthome.io.transport.mqtt; - -import java.io.IOException; - -/** - * Callback interface for sending a message to the MqttBrokerConnection. - * - * @deprecated - * @author Davy Vanherbergen - */ -@Deprecated -public interface MqttSenderChannel { - /** - * Send a message to the MQTT broker. Please do not use this interface anymore, but call - * {@see MqttBrokerConnection.publish()} instead. You will not get a notification if your - * message arrived the broker. - * - * @deprecated - * @param topic Topic to publish the message to. - * @param message message payload. - * @throws IOException if there is no broker connection - */ - @Deprecated - public void publish(String topic, byte[] message) throws IOException; -} diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java index 78fde8155db..975a47ad407 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/MqttService.java @@ -7,26 +7,28 @@ */ package org.eclipse.smarthome.io.transport.mqtt; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import javax.naming.ConfigurationException; -import org.apache.commons.lang.StringUtils; -import org.eclipse.smarthome.core.events.EventPublisher; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.smarthome.config.core.Configuration; +import org.eclipse.smarthome.io.transport.mqtt.internal.ConnectionConfiguration; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Modified; -import org.osgi.service.component.annotations.Reference; -import org.osgi.service.component.annotations.ReferenceCardinality; -import org.osgi.service.component.annotations.ReferencePolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +44,10 @@ @Component(immediate = true, service = { MqttService.class }, configurationPid = { "org.eclipse.smarthome.mqtt" }, property = "service.pid=org.eclipse.smarthome.mqtt") public class MqttService { - private static final String NAME_PROPERTY = "name"; private final Logger logger = LoggerFactory.getLogger(MqttService.class); private final Map brokerConnections = new ConcurrentHashMap(); + private final Set serviceConfigurationConnections = new HashSet<>(); private final List brokersObservers = new CopyOnWriteArrayList<>(); - @Deprecated - private EventPublisher eventPublisher; /** * The expected service configuration looks like this: @@ -61,8 +61,9 @@ public class MqttService { * @param properties Service configuration * @return A 'list' of broker configurations as key-value maps. A configuration map at least contains a "name". */ - public Map> extractBrokerConfigurations(Map properties) { - Map> configPerBroker = new HashMap>(); + public Map<@NonNull String, @NonNull Map> extractBrokerConfigurations( + Map properties) { + Map<@NonNull String, @NonNull Map> configPerBroker = new HashMap<@NonNull String, @NonNull Map>(); for (Entry entry : properties.entrySet()) { String key = entry.getKey(); // ignore the non-broker properties @@ -71,26 +72,23 @@ public Map> extractBrokerConfigurations(Map brokerConfig = configPerBroker.get(brokername); + @Nullable + Map brokerConfig = configPerBroker.get(brokerID); if (brokerConfig == null) { brokerConfig = new HashMap<>(); - configPerBroker.put(brokername, brokerConfig); - brokerConfig.put(NAME_PROPERTY, brokername); + configPerBroker.put(brokerID, brokerConfig); } brokerConfig.put(subkeys[1], value); @@ -109,11 +107,8 @@ public void modified(Map config) { Iterator> it = brokerConnections.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); - MqttBrokerConnection connection = entry.getValue(); - if (connection.isTextualConfiguredBroker()) { - logger.debug("Received new Mqtt configuration: Close connection to {}:{}", connection.getName(), - connection.getClientId()); - connection.close(); + if (serviceConfigurationConnections.remove(entry.getKey())) { + entry.getValue().stop(); it.remove(); } } @@ -123,12 +118,16 @@ public void modified(Map config) { return; } - Map> brokerConfigs = extractBrokerConfigurations(config); + Map<@NonNull String, @NonNull Map> brokerConfigs = extractBrokerConfigurations(config); - for (Map brokerConfig : brokerConfigs.values()) { + for (Entry<@NonNull String, @NonNull Map> entry : brokerConfigs.entrySet()) { try { - addBrokerConnection(brokerConfig).start(); - } catch (ConfigurationException e) { + @SuppressWarnings("null") + ConnectionConfiguration c = new Configuration(entry.getValue()).as(ConnectionConfiguration.class); + MqttBrokerConnection brokerConnection = addBrokerConnection(entry.getKey(), c); + serviceConfigurationConnections.add(entry.getKey()); + brokerConnection.start(); + } catch (ConfigurationException | IllegalArgumentException e) { logger.warn("MqttBroker connection configuration faulty: {}", e.getMessage()); } catch (MqttException e) { logger.warn("MqttBroker start failed: {}", e.getMessage(), e); @@ -145,9 +144,7 @@ public void activate(Map config) { @Deactivate public void deactivate() { logger.debug("Stopping MQTT Service..."); - for (final MqttBrokerConnection conn : brokerConnections.values()) { - conn.close(); - } + brokerConnections.forEach((id, c) -> c.stop()); brokerConnections.clear(); } @@ -182,33 +179,29 @@ public boolean hasBrokerObservers() { * @param brokerName to look for. * @return existing connection or null */ - public MqttBrokerConnection getBrokerConnection(String brokerName) { - synchronized (brokerConnections) { - return brokerConnections.get(brokerName.toLowerCase()); - } + public synchronized MqttBrokerConnection getBrokerConnection(String brokerName) { + return brokerConnections.get(brokerName); } /** - * Adds a broker to the service. The broker connection will not be altered (started/stopped), - * by adding it to the service. + * Adds a broker connection to the service. + * The broker connection state will not be altered (started/stopped). * - * The broker connection will be identified by its name. The name must be unique within the service. + * It is your responsibility to remove the broker connection again by calling + * removeBrokerConnection(brokerID). * + * @param brokerID The broker connection will be identified by this ID. The ID must be unique within the service. * @param connection The broker connection object * @return Return true if the connection could be added successfully, return false if there is already * an existing connection with the same name. */ - public boolean addBrokerConnection(MqttBrokerConnection connection) { - synchronized (brokerConnections) { - final String brokerID = connection.getName().toLowerCase(); - if (brokerConnections.containsKey(brokerID)) { - return false; - } - brokerConnections.put(brokerID, connection); - for (MqttBrokersObserver o : brokersObservers) { - o.brokerAdded(connection); - } + public synchronized boolean addBrokerConnection(@NonNull String brokerID, + @NonNull MqttBrokerConnection connection) { + if (brokerConnections.containsKey(brokerID)) { + return false; } + brokerConnections.put(brokerID, connection); + brokersObservers.forEach(o -> o.brokerAdded(connection)); return true; } @@ -217,212 +210,57 @@ public boolean addBrokerConnection(MqttBrokerConnection connection) { * Additional properties are "user","pwd","qos","retain","lwt","keepAlive","clientId", please read the * service configuration documentation for a detailed description. * - * @param brokerConnectionConfig The configuration key-value map. + * @param config The configuration key-value map. * @return Returns the created broker connection or null if there is already a connection with the same name. - * @throws ConfigurationException Most likely your provided name and url are invalid. + * @throws ConfigurationException Most likely your provided host is invalid. * @throws MqttException */ - public MqttBrokerConnection addBrokerConnection(Map brokerConnectionConfig) - throws ConfigurationException, MqttException { - // Extract mandatory fields - String brokerID = brokerConnectionConfig.get(NAME_PROPERTY); - if (StringUtils.isBlank(brokerID)) { - throw new ConfigurationException("MQTT Broker property 'name' is not provided"); - } - brokerID = brokerID.toLowerCase(); + public synchronized MqttBrokerConnection addBrokerConnection(@NonNull String brokerID, + @NonNull ConnectionConfiguration config) throws ConfigurationException, MqttException { - final String brokerURL = brokerConnectionConfig.get("url"); - if (StringUtils.isBlank(brokerURL)) { - throw new ConfigurationException("MQTT Broker property 'url' is not provided"); - } - // Add the connection - MqttBrokerConnection connection; - synchronized (brokerConnections) { - connection = brokerConnections.get(brokerID); - if (connection != null) { - return null; - } - connection = new MqttBrokerConnection(brokerID, brokerURL, true); - brokerConnections.put(brokerID, connection); + if (brokerConnections.containsKey(brokerID)) { + return null; } + MqttBrokerConnection connection = new MqttBrokerConnection(config.host, config.port.intValue(), config.secure); + brokerConnections.put(brokerID, connection); // Extract further configurations - connection.setCredentials(brokerConnectionConfig.get("user"), brokerConnectionConfig.get("pwd")); - connection.setClientId(brokerConnectionConfig.get("clientId")); - String property = brokerConnectionConfig.get("keepAlive"); - if (!StringUtils.isBlank(property)) { - connection.setKeepAliveInterval(Integer.valueOf(property)); - } - property = brokerConnectionConfig.get("qos"); - if (!StringUtils.isBlank(property)) { - connection.setQos(Integer.valueOf(property)); - } - property = brokerConnectionConfig.get("retain"); - if (!StringUtils.isBlank(property)) { - connection.setRetain(Boolean.valueOf(property)); + connection.setCredentials(config.username, config.password); + connection.setClientId(config.clientID); + if (config.keepAlive != null) { + connection.setKeepAliveInterval(config.keepAlive.intValue()); } - MqttWillAndTestament will = MqttWillAndTestament.fromString(brokerConnectionConfig.get("lwt")); + + connection.setQos(config.qos.intValue()); + connection.setRetain(config.retainMessages); + MqttWillAndTestament will = MqttWillAndTestament.fromString(config.lastWill); if (will != null) { logger.debug("Setting last will: {}", will); connection.setLastWill(will); } - for (MqttBrokersObserver o : brokersObservers) { - o.brokerAdded(connection); - } - + brokersObservers.forEach(o -> o.brokerAdded(connection)); return connection; } - /** - * Remove a broker connection - * - * @param connection The broker connection - */ - public void removeBrokerConnection(MqttBrokerConnection connection) { - synchronized (brokerConnections) { - if (brokerConnections.remove(connection.getName().toLowerCase(), connection)) { - for (MqttBrokersObserver o : brokersObservers) { - o.brokerRemoved(connection); - } - } - } - } - /** * Remove a broker connection by name * * @param brokerName The broker name * @return Returns the removed broker connection, or null if there was none with the given name. */ - public MqttBrokerConnection removeBrokerConnection(String brokerName) { - synchronized (brokerConnections) { - MqttBrokerConnection connection = brokerConnections.remove(brokerName.toLowerCase()); - if (connection != null) { - for (MqttBrokersObserver o : brokersObservers) { - o.brokerRemoved(connection); - } - } - return connection; + public synchronized MqttBrokerConnection removeBrokerConnection(String brokerName) { + final MqttBrokerConnection connection = brokerConnections.remove(brokerName); + if (connection != null) { + brokersObservers.forEach(o -> o.brokerRemoved(connection)); } + return connection; } /** - * Returns all currently configured brokers, textual as well as dynamically added ones. - */ - public Collection getAllBrokerConnections() { - return brokerConnections.values(); - } - - /** - * Register a new connection observer that could act on MQTT connection changes. - * This is deprecated, please register on the broker connection object instead. - * - * @deprecated - * @param brokerName Name of the broker that connection should be observed. - * @param connectionObserver The connection observer that should be informed about connection changes. - */ - @Deprecated - public void registerConnectionObserver(String brokerName, MqttConnectionObserver connectionObserver) { - MqttBrokerConnection brokerConnection = getBrokerConnection(brokerName); - if (brokerConnection != null) { - brokerConnection.addConnectionObserver(connectionObserver); - } - } - - /** - * Unregister an existing connection observer. - * - * @deprecated - * @param brokerName Name of the broker that connection has been observed. - * @param connectionObserver The connection observer that should not be informed anymore. - */ - @Deprecated - public void unregisterConnectionObserver(String brokerName, MqttConnectionObserver connectionObserver) { - MqttBrokerConnection brokerConnection = getBrokerConnection(brokerName); - if (brokerConnection != null) { - brokerConnection.removeConnectionObserver(connectionObserver); - } - } - - /** - * Register a new message consumer which can process messages received on - * - * @deprecated - * @param brokerName Name of the broker on which to listen for messages. - * @param mqttMessageConsumer Consumer which will process any received message. - */ - @Deprecated - public void registerMessageConsumer(String brokerName, MqttMessageConsumer mqttMessageConsumer) { - try { - MqttBrokerConnection brokerConnection = getBrokerConnection(brokerName); - if (brokerConnection != null) { - brokerConnection.addConsumer(mqttMessageConsumer); - mqttMessageConsumer.setEventPublisher(eventPublisher); - } - } catch (MqttException e) { - logger.debug("Consumer could not be activated", e); - } - } - - /** - * Unregisters an existing message consumer. - * - * @deprecated - * @param mqttMessageConsumer Consumer which needs to be unregistered. - */ - @Deprecated - public void unregisterMessageConsumer(String brokerName, MqttMessageConsumer mqttMessageConsumer) { - MqttBrokerConnection brokerConnection = getBrokerConnection(brokerName); - if (brokerConnection != null) { - brokerConnection.removeConsumer(mqttMessageConsumer); - } - } - - /** - * @deprecated - */ - @Deprecated - public void registerMessageProducer(String brokerName, MqttMessageProducer commandPublisher) { - MqttBrokerConnection brokerConnection = getBrokerConnection(brokerName); - if (brokerConnection != null) { - brokerConnection.addProducer(commandPublisher); - } - } - - /** - * @deprecated - */ - @Deprecated - public void unregisterMessageProducer(String brokerName, MqttMessageProducer commandPublisher) { - MqttBrokerConnection brokerConnection = getBrokerConnection(brokerName); - if (brokerConnection != null) { - brokerConnection.removeProducer(commandPublisher); - } - } - - /** - * Set the publisher to use for publishing SmartHome updates. - * This is deprecated, please use declarative services to add your - * own copy of EventPublisher to your bundle. - * - * @deprecated - * @param eventPublisher EventPublisher - */ - @Deprecated - @Reference(cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC) - public void setEventPublisher(EventPublisher eventPublisher) { - this.eventPublisher = eventPublisher; - } - - /** - * Remove the publisher to use for publishing SmartHome updates. - * - * @deprecated - * @param eventPublisher EventPublisher + * Returns all currently configured brokers of this service. */ - @Deprecated - public void unsetEventPublisher(EventPublisher eventPublisher) { - this.eventPublisher = null; + public synchronized Collection getAllBrokerConnections() { + return new ArrayList<>(brokerConnections.values()); } } diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ConnectionConfiguration.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ConnectionConfiguration.java new file mode 100644 index 00000000000..5b4f2fbfdc5 --- /dev/null +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/ConnectionConfiguration.java @@ -0,0 +1,27 @@ +package org.eclipse.smarthome.io.transport.mqtt.internal; + +import java.math.BigDecimal; + +import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection; + +/** + * MQTT broker connection configuration. The fields here also define + * the parameters that can be set in the configuration file of this service. + * + * @author David Graeff - Initial contribution + */ +public class ConnectionConfiguration { + public String host; + public BigDecimal port = BigDecimal.ZERO; + public Boolean secure = true; + + public String username; + public String password; + + public String clientID; + public BigDecimal qos = BigDecimal.valueOf(MqttBrokerConnection.DEFAULT_QOS); + public Boolean retainMessages = false; + + public String lastWill; + public BigDecimal keepAlive; +} diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttSenderChannelImpl.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttSenderChannelImpl.java deleted file mode 100644 index 07034ccb4c2..00000000000 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/internal/MqttSenderChannelImpl.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (c) 2014-2017 by the respective copyright holders. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - */ -package org.eclipse.smarthome.io.transport.mqtt.internal; - -import java.io.IOException; - -import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection; -import org.eclipse.smarthome.io.transport.mqtt.MqttException; -import org.eclipse.smarthome.io.transport.mqtt.MqttSenderChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * @author David Gräff - Initial contribution - */ -@Deprecated -public class MqttSenderChannelImpl implements MqttSenderChannel { - private final MqttBrokerConnection connection; - private final Logger logger = LoggerFactory.getLogger(MqttSenderChannelImpl.class); - - public MqttSenderChannelImpl(MqttBrokerConnection connection) { - this.connection = connection; - } - - @Override - public void publish(String topic, byte[] payload) throws IOException { - if (!connection.isConnected()) { - throw new IOException("No connection, can't publish messages"); - } - - try { - connection.publish(topic, payload, null); - } catch (MqttException e) { - logger.error("Could not publish message to topic {}", topic, e); - } - } -} \ No newline at end of file diff --git a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/reconnect/PeriodicReconnectStrategy.java b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/reconnect/PeriodicReconnectStrategy.java index 5d7d401efc7..c12234074d4 100644 --- a/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/reconnect/PeriodicReconnectStrategy.java +++ b/bundles/io/org.eclipse.smarthome.io.transport.mqtt/src/main/java/org/eclipse/smarthome/io/transport/mqtt/reconnect/PeriodicReconnectStrategy.java @@ -95,7 +95,7 @@ public synchronized void lostConnection() { return; } - logger.info("Try to restore connection to '{}' every {}ms", getBrokerConnection().getName(), + logger.info("Try to restore connection to '{}' every {}ms", getBrokerConnection().getHost(), getReconnectFrequency()); scheduledTask = scheduler.scheduleWithFixedDelay(() -> {