Skip to content
This repository has been archived by the owner on May 7, 2020. It is now read-only.

Commit

Permalink
allows to start-stop-start-stop-... a reconnect strategy (#4168)
Browse files Browse the repository at this point in the history
* allows to start-stop-start-stop-... a reconnect strategy

A MQTT Broker Connection must be able to handle start and stop
iterations.

Related to: #3980 (comment)
Signed-off-by: Markus Rathgeb <maggu2810@gmail.com>
  • Loading branch information
maggu2810 authored and kaikreuzer committed Sep 1, 2017
1 parent 484cc84 commit 0f334f0
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ public synchronized void start() throws MqttException, ConfigurationException {
return;
}

// Ensure the reconnect strategy is started
if (reconnectStrategy != null) {
reconnectStrategy.start();
}

if (StringUtils.isBlank(clientId) || clientId.length() > 23) {
clientId = MqttClient.generateClientId();
}
Expand Down Expand Up @@ -579,15 +584,18 @@ public synchronized void start() throws MqttException, ConfigurationException {

/**
* Close the MQTT connection.
*
* You can re-establish a connection calling {@link #start()} again.
*/
public synchronized void close() {
logger.trace("Closing the MQTT broker connection '{}'", getName());

// Abort a connection attempt
isConnecting = false;

// Stop the reconnect strategy
if (reconnectStrategy != null) {
reconnectStrategy.close();
reconnectStrategy.stop();
}

// Close connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
*/
package org.eclipse.smarthome.io.transport.mqtt.reconnect;

import java.io.Closeable;

import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection;
import org.eclipse.smarthome.io.transport.mqtt.MqttConnectionObserver;

Expand All @@ -18,7 +16,7 @@
*
* @author David Graeff - Initial contribution
*/
public abstract class AbstractReconnectStrategy implements Closeable {
public abstract class AbstractReconnectStrategy {
protected MqttBrokerConnection brokerConnection;

/**
Expand Down Expand Up @@ -56,8 +54,16 @@ public MqttBrokerConnection getBrokerConnection() {
public abstract void connectionEstablished();

/**
* This will be called if your reconnect strategy should no longer try to handle a reconnect.
* Start the reconnect strategy handling.
*/
public abstract void start();

/**
* Stop the reconnect strategy handling.
*
* <p>
* It must be possible to restart a reconnect strategy again after it has been stopped.
*/
@Override
public abstract void close();
public abstract void stop();

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ public class PeriodicReconnectStrategy extends AbstractReconnectStrategy {
private final int reconnectFrequency;
private final int firstReconnectAfter;

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private ScheduledExecutorService scheduler = null;
private ScheduledFuture<?> scheduledTask;
boolean closed = false;

/**
* Use a default 60s reconnect frequency and try the first reconnect after 10s.
Expand All @@ -54,10 +53,40 @@ public PeriodicReconnectStrategy(int reconnectFrequency, int firstReconnectAfter
this.firstReconnectAfter = firstReconnectAfter;
}

@Override
public synchronized void start() {
if (scheduler == null) {
scheduler = Executors.newScheduledThreadPool(1);
}
}

@Override
public synchronized void stop() {
if (scheduler != null) {
scheduler.shutdownNow();
scheduler = null;
}

// If there is a scheduled task ensure it is canceled.
if (scheduledTask != null) {
scheduledTask.cancel(true);
scheduledTask = null;
}
}

/**
* Returns if the reconnect strategy has been started.
*
* @return true if started
*/
public synchronized boolean isStarted() {
return scheduler != null;
}

@Override
public synchronized void lostConnection() {
// If we has been already closed, there is nothing to do.
if (closed) {
// Check if we are running (has been started and not stopped) state.
if (scheduler == null) {
return;
}

Expand All @@ -80,6 +109,7 @@ public synchronized void lostConnection() {

@Override
public synchronized void connectionEstablished() {
// Stop the reconnect task if existing.
if (scheduledTask != null) {
scheduledTask.cancel(true);
scheduledTask = null;
Expand All @@ -98,18 +128,4 @@ public int getReconnectFrequency() {
public int getFirstReconnectAfter() {
return firstReconnectAfter;
}

@Override
public synchronized void close() {
closed = true;

// Shutdown the scheduler.
scheduler.shutdownNow();

// If there is a scheduled task ensure it is canceled.
if (scheduledTask != null) {
scheduledTask.cancel(true);
scheduledTask = null;
}
}
}

0 comments on commit 0f334f0

Please sign in to comment.